Kafka Test Suite (Java)

One of the common issues that I had with Kafka was an integration test(Java). I was searching on the web for a library which will include not only Kafka broker but also Confluent Schema Registry. I didn’t find one, so I went to kafka-streams-examples and extracted classes needed for running Kafka cluster programmatically.

A result of my work you can see here and feel free to use it or change it to your needs. Next question arises, how to use it? You will need to clone this project locally and install it to your local maven repository.

./gradlew clean build install

After you successfully run this command my library will be available as gradle/maven dependency in your project. In the below code examples I show how you can use this library in your project to do integration test using Kafka with Schema Registry.

Project structure

build.gradle

plugins {
    id 'java'
}

group 'io.karengryg'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
    mavenLocal()
}

dependencies {
    implementation('org.apache.kafka:kafka-clients:1.1.0')
    implementation('org.springframework:spring-core:5.0.8.RELEASE')
    implementation('org.springframework:spring-beans:5.0.8.RELEASE')
    //add kafka-test-suite to your project
    testCompile('io.karengryg:kafka-test-suite:0.0.2-SNAPSHOT')
    testCompile('junit:junit:4.12')
}

user-v1.avsc

{
     "type": "record",
     "namespace": "io.karengryg",
     "name": "User",
     "version": "1",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of User" },
       { "name": "last_name", "type": "string", "doc": "Last Name of User" }
     ]
}

KafkaClusterTest.class

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.karengryg.kafkatestsuite.kafka.EmbeddedSingleNodeKafkaCluster;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;

import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class KafkaClusterTest {
    private final String TOPIC = "my-topic";
    private EmbeddedSingleNodeKafkaCluster kafkaCluster;
    private Properties consumerProps;
    private Properties producerProps;

    @Before
    public void setUp() throws Exception {
        //initialize kafka cluster(broker, zookeeper, schema registry)
        kafkaCluster = new EmbeddedSingleNodeKafkaCluster();
        kafkaCluster.start();

        consumerProps = new Properties();
        //set kafka cluster brokers
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.bootstrapServers());
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        //set schema registry url
        consumerProps.put("schema.registry.url", kafkaCluster.schemaRegistryUrl());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        consumerProps.put("auto.offset.reset", "earliest");

        producerProps = new Properties();
        //set kafka cluster brokers
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.bootstrapServers());
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        //set schema registry url
        producerProps.put("schema.registry.url", kafkaCluster.schemaRegistryUrl());
    }

    @After
    public void tearDown() {
        kafkaCluster.stop();
    }

    @Test
    public void canSendAndReceiveMessage() throws IOException {

        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(producerProps);
        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(TOPIC, createUserAvroPayload());
        producer.send(record);

        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(TOPIC));

        ConsumerRecords<String, GenericRecord> message = consumer.poll(5000);

        message.forEach(r -> {
            String value = r.value().toString();
            System.out.println(value);
        });
    }

    private static GenericRecord createUserAvroPayload() throws IOException {

        //Create schema from .avsc file
        Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/user-v1.avsc").getInputStream());

        //Create avro message with defined schema
        GenericRecord avroMessage = new GenericData.Record(mainSchema);

        //Populate avro message
        avroMessage.put("first_name", "Karen");
        avroMessage.put("last_name", "Grygoryan");

        return avroMessage;
    }
}

After the test was successfully executed you will see a message in the console:

...
{"first_name": "Karen", "last_name": "Grygoryan"}
...

Leave a Reply

Your email address will not be published.