When working with a combination of Confluent Schema Registry + Apache Kafka, you may notice that pushing messages with different Avro schemas to one topic was not possible. Starting with Confluent Schema Registry version 4.1.0, you can do it and I will explain to you how.
First, let me explain what your options with multi schema, 2 new configuration options were added: key.subject.name.strategy (which defines how to construct the subject name for message keys), and value.subject.name.strategy (how to construct the subject name for message values). In this post, I will concentrate over value.subject.name.strategy, but the same technique applies to key.subject.name.strategy. Those 2 new configurations can be set with below values:
- io.confluent.kafka.serializers.subject.TopicNameStrategy (default): The subject name for message keys is {topic}-key, and {topic}-value for message values. This means that the schemas of all messages in the topic must be compatible with each other.
- io.confluent.kafka.serializers.subject.RecordNameStrategy: The subject name is the fully-qualified name of the Avro record type of the message. Thus, the schema registry checks the compatibility for a particular record type, regardless of topic. This setting allows any number of different event types in the same topic.
- io.confluent.kafka.serializers.subject.TopicRecordNameStrategy: The subject name is {topic}-{type}, where {topic} is the Kafka topic name, and {type} is the fully-qualified name of the Avro record type of the message. This setting also allows any number of event types in the same topic, and further constrains the compatibility check to the current topic only.
Enough words, let’s dive into the implementation of this new feature. First and foremost we need to set up infrastructure. There is super useful docker image available which comes with Apache Kafka, Kafka Connect, Zookeeper, UI, Confluent Schema Registry and REST Proxy – more info Fast Data Dev.
Run in terminal:
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 \ -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 \ landoop/fast-data-dev:latest
Once docker container will be up and running, click http://localhost:3030.
You will see nice UI:
Click on “Kafka Topics UI”, new view will appear:
In this view, we can see a list of topic names(created by default) and if you will click on one of them it will show you it’s content.
But for now, we will concentrate over the actual code.
Project structure:
KafkaAvroCustomProducer.java
package io.karengryg.multischematopic; import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.core.io.ClassPathResource; import java.io.IOException; import java.util.Properties; public class KafkaAvroCustomProducer { private static final String TOPIC = "mykafkatopic"; public static void main(String[] args) throws IOException { //quick and dirty hack to set up log4j, don't do it in production! org.apache.log4j.BasicConfigurator.configure(); //Kafka producer properties Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); properties.setProperty("key.serializer", StringSerializer.class.getName()); properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName()); properties.setProperty("schema.registry.url", "http://127.0.0.1:8081"); //Set value for new property properties.setProperty("value.subject.name.strategy", TopicRecordNameStrategy.class.getName()); //Create user avro message GenericRecord userAvroPayload = createUserAvroPayload(); //Create movie avro message GenericRecord movieAvroPayload = createMovieAvroPayload(); //Create kafka producer and set properties Producer<String, GenericRecord> producer = new KafkaProducer<>(properties); //Create 2 kafka messages ProducerRecord<String, GenericRecord> userAvroRecord = new ProducerRecord<>(TOPIC, userAvroPayload); ProducerRecord<String, GenericRecord> movieAvroRecord = new ProducerRecord<>(TOPIC, movieAvroPayload); //Send both messages to kafka producer.send(movieAvroRecord); producer.send(userAvroRecord); producer.flush(); producer.close(); } private static GenericRecord createMovieAvroPayload() throws IOException { //Create schema from .avsc file Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/movie-v1.avsc").getInputStream()); //Create avro message with defined schema GenericRecord avroMessage = new GenericData.Record(mainSchema); //Populate avro message avroMessage.put("movie_name", "Casablanca"); avroMessage.put("genre", "Drama/Romance"); return avroMessage; } private static GenericRecord createUserAvroPayload() throws IOException { //Create schema from .avsc file Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/user-v1.avsc").getInputStream()); //Create avro message with defined schema GenericRecord avroMessage = new GenericData.Record(mainSchema); //Populate avro message avroMessage.put("first_name", "Karen"); avroMessage.put("last_name", "Grygoryan"); return avroMessage; } }
Out of all code, our main point of interest is at line 31, where we set value.subject.name.strategy to TopicRecordNameStrategy.class.getName(). We are saying that our kafka topic will have multiple schemas of value, completely different. In lines 34 and 37, I created 2 different Avro messages(User and Movie) and later in the code I pushed them to the same topic.
movie-v1.avsc
{ "type": "record", "namespace": "io.karengryg", "name": "Movie", "version": "1", "fields": [ { "name": "movie_name", "type": "string", "doc": "Name of Movie" }, { "name": "genre", "type": "string", "doc": "Genre of Movie" } ] }
user-v1.avsc
{ "type": "record", "namespace": "io.karengryg", "name": "User", "version": "1", "fields": [ { "name": "first_name", "type": "string", "doc": "First Name of User" }, { "name": "last_name", "type": "string", "doc": "Last Name of User" } ] }
build.gradle
buildscript { repositories { maven { url 'https://plugins.gradle.org/m2/' } jcenter() mavenCentral() } } plugins { id 'java' id 'maven' } group 'io.karengryg' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() maven { url "http://packages.confluent.io/maven/" } } dependencies { //I added spring here, because it gives me easy to use ClassPathResource.class //which I use to read avro schema from resources folder, but you can omit it //and use standard java api for reading files. compile 'org.springframework:spring-core:5.0.8.RELEASE' compile 'io.confluent:kafka-avro-serializer:4.1.1' testCompile 'junit:junit:4.12' }
Everything is set, so we can safely run our main class.
Extract from logger, which shows that I’m registering 2 different schemas in same kafka topic, but with different values:
... 816 [main] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Movie\",\"namespace\":\"io.karengryg\",\"fields\":[{\"name\":\"movie_name\",\"type\":\"string\",\"doc\":\"Name of Movie\"},{\"name\":\"genre\",\"type\":\"string\",\"doc\":\"Genre of Movie\"}],\"version\":\"1\"}"} to http://127.0.0.1:8081/subjects/mykafkatopic-io.karengryg.Movie/versions 969 [main] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"io.karengryg\",\"fields\":[{\"name\":\"first_name\",\"type\":\"string\",\"doc\":\"First Name of User\"},{\"name\":\"last_name\",\"type\":\"string\",\"doc\":\"Last Name of User\"}],\"version\":\"1\"}"} to http://127.0.0.1:8081/subjects/mykafkatopic-io.karengryg.User/versions ...
Finally, we can open “Kafka Topics UI” and see our payload under our topic name:
In this example, I applied multi schema only for the value of a message and left key empty, for simplicity purposes, but you can do it also for the key. For more info pls read here.
The question is, how this data could be read by consumer…
Exactly !
Exactly, how to consume a multiple schema topic ?
Below link will explain the purpose of multiple schema topic:
https://www.confluent.io/blog/put-several-event-types-kafka-topic/
Consumer example would be great. What is a good pattern to deal with the dynamic type casting into strongly typed Avro objects for a consumer?
Thank u. But, how can we read a multi-schema topic? This is a particular problem im having right now in Spark (Abris lib)
Actually I’m looking for a consumer example too, I have a topic with multiple schemas, if this could be provided that would be great.
How using protobuf schema’s?
Can anyone share me the hands-on example using protobuf schema?
Thankfully topics do not support schemas so the question of one or more schemas is not relevant.
Publishers are free to publish anything to any topic. This is problematic for consumers so usually the producer and consumer agree on some means of type resolution. The simplest approach is to publish a single structure to each topic. That satisfies the majority of cases.
Enter schema registry. If we add a schema ID to the message then we can formalize the contract between the producer and the consumer (still comfortably outside of Kafka topic as it should be). Confluent enforced the simple approach of a one-to-one relationship between topic and schema. That violates the original intent of topics: publishers are free to publish anything to any topic.
Supporting multiple schemas per associated topic (again, topics themselves are oblivious to the notion of a schema) fixes a bug in Confluent schema registry. Of course a topic can have different data structures. That was the whole point.
The “simple” one-to-one approach (one topic one schema) is problematic for multi-step transactions with a series of messages of varying type. Those messages need to preserve order and they need to be published to the same topic. Schema IDs simplify processing. Consumers can easily switch on the schema ID to deserialize different types.
So multiple schemas per topics is awesome, easy to use and does not interfere with the very liberal contract between Kafka brokers and Kafka clients. It’s up to producers and consumers to manage message type resolution, and with the help of the schema registry that gets a lot simpler.
If you’re using spring-kafka you can use a @KafkaListener with multiple @KafkaHandler methods, each one of them using the generated class extending SpecificRecord as method parameter.
You’ll also need to configure your application with “spring.kafka.properties.specific.avro.reader: true” unless you want to use GenericRecord$Record class and deserialize that by yourself.