Multi schemas in one Kafka topic

When working with a combination of Confluent Schema Registry + Apache Kafka, you may notice that pushing messages with different Avro schemas to one topic was not possible. Starting with Confluent Schema Registry version 4.1.0, you can do it and I will explain to you how.

First, let me explain what your options with multi schema, 2 new configuration options were added: key.subject.name.strategy (which defines how to construct the subject name for message keys), and value.subject.name.strategy (how to construct the subject name for message values). In this post, I will concentrate over value.subject.name.strategy, but the same technique applies to key.subject.name.strategy. Those 2 new configurations can be set with below values:

  • io.confluent.kafka.serializers.subject.TopicNameStrategy (default): The subject name for message keys is {topic}-key, and {topic}-value for message values. This means that the schemas of all messages in the topic must be compatible with each other.
  • io.confluent.kafka.serializers.subject.RecordNameStrategy: The subject name is the fully-qualified name of the Avro record type of the message. Thus, the schema registry checks the compatibility for a particular record type, regardless of topic. This setting allows any number of different event types in the same topic.
  • io.confluent.kafka.serializers.subject.TopicRecordNameStrategy: The subject name is {topic}-{type}, where {topic} is the Kafka topic name, and {type} is the fully-qualified name of the Avro record type of the message. This setting also allows any number of event types in the same topic, and further constrains the compatibility check to the current topic only.

Enough words, let’s dive into the implementation of this new feature. First and foremost we need to set up infrastructure. There is super useful docker image available which comes with Apache Kafka, Kafka Connect, Zookeeper, UI, Confluent Schema Registry and REST Proxy – more info Fast Data Dev.

Run in terminal:

 docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 \
       -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 \
       landoop/fast-data-dev:latest

Once docker container will be up and running, click http://localhost:3030.
You will see nice UI:

Click on “Kafka Topics UI”, new view will appear:

In this view, we can see a list of topic names(created by default) and if you will click on one of them it will show you it’s content.
But for now, we will concentrate over the actual code.

Project structure:

KafkaAvroCustomProducer.java


package io.karengryg.multischematopic;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.core.io.ClassPathResource;

import java.io.IOException;
import java.util.Properties;

public class KafkaAvroCustomProducer {

    private static final String TOPIC = "mykafkatopic";

    public static void main(String[] args) throws IOException {
        //quick and dirty hack to set up log4j, don't do it in production!
        org.apache.log4j.BasicConfigurator.configure();
        //Kafka producer properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

        //Set value for new property
        properties.setProperty("value.subject.name.strategy", TopicRecordNameStrategy.class.getName());

        //Create user avro message
        GenericRecord userAvroPayload = createUserAvroPayload();

        //Create movie avro message
        GenericRecord movieAvroPayload = createMovieAvroPayload();

        //Create kafka producer and set properties
        Producer<String, GenericRecord> producer = new KafkaProducer<>(properties);

        //Create 2 kafka messages
        ProducerRecord<String, GenericRecord> userAvroRecord = new ProducerRecord<>(TOPIC, userAvroPayload);
        ProducerRecord<String, GenericRecord> movieAvroRecord = new ProducerRecord<>(TOPIC, movieAvroPayload);

        //Send both messages to kafka
        producer.send(movieAvroRecord);
        producer.send(userAvroRecord);
        
        producer.flush();
        producer.close();
    }

    private static GenericRecord createMovieAvroPayload() throws IOException {

        //Create schema from .avsc file
        Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/movie-v1.avsc").getInputStream());

        //Create avro message with defined schema
        GenericRecord avroMessage = new GenericData.Record(mainSchema);

        //Populate avro message
        avroMessage.put("movie_name", "Casablanca");
        avroMessage.put("genre", "Drama/Romance");

        return avroMessage;
    }

    private static GenericRecord createUserAvroPayload() throws IOException {

        //Create schema from .avsc file
        Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/user-v1.avsc").getInputStream());

        //Create avro message with defined schema
        GenericRecord avroMessage = new GenericData.Record(mainSchema);

        //Populate avro message
        avroMessage.put("first_name", "Karen");
        avroMessage.put("last_name", "Grygoryan");

        return avroMessage;
    }
}

Out of all code, our main point of interest is at line 31, where we set value.subject.name.strategy to TopicRecordNameStrategy.class.getName(). We are saying that our kafka topic will have multiple schemas of value, completely different. In lines 34 and 37, I created 2 different Avro messages(User and Movie) and later in the code I pushed them to the same topic.

movie-v1.avsc

{
     "type": "record",
     "namespace": "io.karengryg",
     "name": "Movie",
     "version": "1",
     "fields": [
       { "name": "movie_name", "type": "string", "doc": "Name of Movie" },
       { "name": "genre", "type": "string", "doc": "Genre of Movie" }
     ]
}

user-v1.avsc

{
     "type": "record",
     "namespace": "io.karengryg",
     "name": "User",
     "version": "1",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of User" },
       { "name": "last_name", "type": "string", "doc": "Last Name of User" }
     ]
}

build.gradle

buildscript {
    repositories {
        maven { url 'https://plugins.gradle.org/m2/' }
        jcenter()
        mavenCentral()
    }
}

plugins {
    id 'java'
    id 'maven'
}

group 'io.karengryg'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven { url "http://packages.confluent.io/maven/" }
}

dependencies {
    //I added spring here, because it gives me easy to use ClassPathResource.class
    //which I use to read avro schema from resources folder, but you can omit it
    //and use standard java api for reading files.
    compile 'org.springframework:spring-core:5.0.8.RELEASE'
    compile 'io.confluent:kafka-avro-serializer:4.1.1'
    testCompile 'junit:junit:4.12'
}

Everything is set, so we can safely run our main class.

Extract from logger, which shows that I’m registering 2 different schemas in same kafka topic, but with different values:

...
816 [main] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService  - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Movie\",\"namespace\":\"io.karengryg\",\"fields\":[{\"name\":\"movie_name\",\"type\":\"string\",\"doc\":\"Name of Movie\"},{\"name\":\"genre\",\"type\":\"string\",\"doc\":\"Genre of Movie\"}],\"version\":\"1\"}"} to http://127.0.0.1:8081/subjects/mykafkatopic-io.karengryg.Movie/versions
969 [main] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService  - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"io.karengryg\",\"fields\":[{\"name\":\"first_name\",\"type\":\"string\",\"doc\":\"First Name of User\"},{\"name\":\"last_name\",\"type\":\"string\",\"doc\":\"Last Name of User\"}],\"version\":\"1\"}"} to http://127.0.0.1:8081/subjects/mykafkatopic-io.karengryg.User/versions
...

Finally, we can open “Kafka Topics UI” and see our payload under our topic name:

In this example, I applied multi schema only for the value of a message and left key empty, for simplicity purposes, but you can do it also for the key. For more info pls read here.

5 thoughts on “Multi schemas in one Kafka topic”

  1. Consumer example would be great. What is a good pattern to deal with the dynamic type casting into strongly typed Avro objects for a consumer?

Leave a Reply

Your email address will not be published. Required fields are marked *