I want to share with you a few useful code snippets while working with Avro schema/message.
On my current project, we heavily utilize Avro schemas and Avro messages, it is relatively easy to use and manipulate.
But when your schema gets more complicated it becomes very cumbersome to work with it.
I’ve been struggling a lot with it and finally, after days of surfing the web and tests, I was able to come up with some generic code. Which allows you to convert Avro schema -> Avro message -> POJO -> Avro message.
Let’s take as a base, project from this post. I have added few new dependencies and gradle-avro-plugin, which will help us generate POJO from Avro schema.
Project structure:
build.gradle:
buildscript { repositories { jcenter() mavenCentral() maven { url 'https://plugins.gradle.org/m2/' } } dependencies { //plugin needed to generate POJO from Avro schema classpath("com.commercehub.gradle.plugin:gradle-avro-plugin:0.14.2") } } apply plugin: "maven" apply plugin: "java" //dont forget to add plugin apply plugin: "com.commercehub.gradle.plugin.avro-base" group 'io.karengryg' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 repositories { jcenter() mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } maven { url "http://packages.confluent.io/maven/" } } dependencies { compile 'org.springframework:spring-core:5.0.8.RELEASE' compile 'org.springframework:spring-beans:5.0.8.RELEASE' //for gradle-avro-plugin to work correctly need avro version 1.8.2 compile "org.apache.avro:avro:1.8.2" compile 'io.confluent:kafka-avro-serializer:4.1.1' testCompile 'junit:junit:4.12' } //gradle-avro-plugin setting avro { //convert all string type to "java.lang.CharSequence" stringType = "CharSequence" } task generateAvro(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) { //point plugin to folder with avro schemas source("src/main/resources/avro") //point plugin to folder with generated POJOs outputDir = file("src/main/java/avro") } compileJava.source(generateAvro.outputs)
movie-v1.avsc
{ "type": "record", "namespace": "io.karengryg", "name": "Movie", "version": "1", "fields": [ { "name": "movie_name", "type": "string", "doc": "Name of Movie" }, { "name": "genre", "type": "string", "doc": "Genre of Movie" } ] }
user-v1.avsc
{ "type": "record", "namespace": "io.karengryg", "name": "User", "version": "1", "fields": [ { "name": "first_name", "type": "string", "doc": "First Name of User" }, { "name": "last_name", "type": "string", "doc": "Last Name of User" } ] }
KafkaAvroCustomProducer.java
package io.karengryg.multischematopic; import io.karengryg.Movie; import io.karengryg.User; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.*; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.springframework.beans.PropertyAccessorFactory; import org.springframework.core.io.ClassPathResource; import org.springframework.util.Assert; import java.io.ByteArrayOutputStream; import java.io.IOException; public class KafkaAvroCustomProducer { public static void main(String[] args) throws IOException { //Create user avro message from avro schema GenericRecord userAvroPayload = createUserAvroPayload(); GenericRecord movieAvroPayload = createMovieAvroPayload(); //Convert avro message to POJO User user = mapRecordToObject(userAvroPayload, new User()); Movie movie = mapRecordToObject(movieAvroPayload, new Movie()); System.out.println(movie); System.out.println(user); //Modify POJO user.setFirstName("Leo"); user.setLastName("Messi"); movie.setMovieName("Totoro"); movie.setGenre("anime"); //Convert back to avro message GenericRecord genericRecordUser = pojoToRecord(user); GenericRecord genericRecordMovie = pojoToRecord(movie); System.out.println("****************"); System.out.println(genericRecordMovie); System.out.println(genericRecordUser); } private static GenericRecord createMovieAvroPayload() throws IOException { //Create schema from .avsc file Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/movie-v1.avsc").getInputStream()); //Create avro message with defined schema GenericRecord avroMessage = new GenericData.Record(mainSchema); //Populate avro message avroMessage.put("movie_name", "Casablanca"); avroMessage.put("genre", "Drama/Romance"); return avroMessage; } private static GenericRecord createUserAvroPayload() throws IOException { //Create schema from .avsc file Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/user-v1.avsc").getInputStream()); //Create avro message with defined schema GenericRecord avroMessage = new GenericData.Record(mainSchema); //Populate avro message avroMessage.put("first_name", "Karen"); avroMessage.put("last_name", "Grygoryan"); return avroMessage; } private static <T> GenericRecord pojoToRecord(T model) throws IOException { Schema schema = ReflectData.get().getSchema(model.getClass()); ReflectDatumWriter<T> datumWriter = new ReflectDatumWriter<>(schema); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); datumWriter.write(model, encoder); encoder.flush(); DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(outputStream.toByteArray(), null); return datumReader.read(null, decoder); } private static <T> T mapRecordToObject(GenericRecord record, T object) { Assert.notNull(record, "record must not be null"); Assert.notNull(object, "object must not be null"); final Schema schema = ReflectData.get().getSchema(object.getClass()); Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn’t match"); record.getSchema().getFields().forEach(d -> PropertyAccessorFactory.forDirectFieldAccess(object).setPropertyValue(d.name(), record.get(d.name()) == null ? record.get(d.name()) : record.get(d.name()).toString())); return object; } }
There is a lot of going on so let us discuss what we are doing here. First, we will run in terminal
./gradlew clean build
for gradle-avro-plugin to generate POJO:
Note those POJO’s are auto-generated so don’t try to add/edit it’s method use it “as-is”. Then we will use those POJOs as the base in our conversions(KafkaAvroCustomProducer.java lines 28-29). Method mapRecordToObject() will take POJO and Avro message(GenericRecord) as parameters and will “copy” all values from Avro message to POJO and will return POJO back as result. After that, I modify POJO(KafkaAvroCustomProducer.java lines 35-38) and then I convert my modified POJO to Avro message(KafkaAvroCustomProducer.java lines 41-42). You can “mix & match” those conversion methods as you need and produce required outcome.
Finally, after we run main method we will see print out from console:
{"movie_name": "Casablanca", "genre": "Drama/Romance"} {"first_name": "Karen", "last_name": "Grygoryan"} **************** {"movie_name": "Totoro", "genre": "anime"} {"first_name": "Leo", "last_name": "Messi"}
Nice succinct example and explanation. Appreciated
You welcome!
Useful utility.
I have used in my project too.
But When Object has nested objects like….
example: Customer pojo has Address pojo.
How will mapRecordToObject() method work?
I have been struggling to make this conversion.
Íf you are aware, can you please help.
I’ve been using same approach for deeply nested objects successfully too.
Can you please provide an example for the same thanks in advance
I have the same problem with nested objects, exception is “no matching editors or conversion strategy found”. Could you provide example with your solution?
Thank you for such good explanation! I recently trying to solve problem – how to deserialize POJO from Avro message if there are multiple various schemas per topic? Avro message can get writer’s schema id, but not reader schema.
I think like this. On app start register all reader schemas in map schemaName -> POJO. Override default KafkaAvroDeserializer and when schema registry or cache retrieve writer’s schema, then also retrieve reader’s schema from map by schema name, because they must be same.
So, maybe you are already solved the task. I will be really appreciate any help.
Nice Article. Can you also add an example where avro schema can have nested list of objects. For example, movie can have list of actors and each actor can have firstName, lastName etc
Hello, wants to know that if this code snippet still working. I want to generate the POJO out of AVRO schema.