Avro and POJO conversions(tips for Kafka devs)

I want to share with you a few useful code snippets while working with Avro schema/message.

On my current project, we heavily utilize Avro schemas and Avro messages, it is relatively easy to use and manipulate.
But when your schema gets more complicated it becomes very cumbersome to work with it.
I’ve been struggling a lot with it and finally, after days of surfing the web and tests, I was able to come up with some generic code. Which allows you to convert Avro schema -> Avro message -> POJO -> Avro message.
Let’s take as a base, project from this post. I have added few new dependencies and gradle-avro-plugin, which will help us generate POJO from Avro schema.

Project structure:

build.gradle:

buildscript {
    repositories {
        jcenter()
        mavenCentral()
        maven { url 'https://plugins.gradle.org/m2/' }
    }
    dependencies {
        //plugin needed to generate POJO from Avro schema
        classpath("com.commercehub.gradle.plugin:gradle-avro-plugin:0.14.2")
    }
}


apply plugin: "maven"
apply plugin: "java"
//dont forget to add plugin
apply plugin: "com.commercehub.gradle.plugin.avro-base"

group 'io.karengryg'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    jcenter()
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
    maven { url "http://packages.confluent.io/maven/" }
}

dependencies {
    compile 'org.springframework:spring-core:5.0.8.RELEASE'
    compile 'org.springframework:spring-beans:5.0.8.RELEASE'
    //for gradle-avro-plugin to work correctly need avro version 1.8.2
    compile "org.apache.avro:avro:1.8.2"
    compile 'io.confluent:kafka-avro-serializer:4.1.1'
    testCompile 'junit:junit:4.12'
}

//gradle-avro-plugin setting
avro {
    //convert all string type to "java.lang.CharSequence"
    stringType = "CharSequence"
}

task generateAvro(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) {
    //point plugin to folder with avro schemas
    source("src/main/resources/avro")
    //point plugin to folder with generated POJOs
    outputDir = file("src/main/java/avro")
}

compileJava.source(generateAvro.outputs)

movie-v1.avsc

{
     "type": "record",
     "namespace": "io.karengryg",
     "name": "Movie",
     "version": "1",
     "fields": [
       { "name": "movie_name", "type": "string", "doc": "Name of Movie" },
       { "name": "genre", "type": "string", "doc": "Genre of Movie" }
     ]
}

user-v1.avsc

{
     "type": "record",
     "namespace": "io.karengryg",
     "name": "User",
     "version": "1",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of User" },
       { "name": "last_name", "type": "string", "doc": "Last Name of User" }
     ]
}

KafkaAvroCustomProducer.java

package io.karengryg.multischematopic;

import io.karengryg.Movie;
import io.karengryg.User;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.Assert;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class KafkaAvroCustomProducer {

    public static void main(String[] args) throws IOException {

        //Create user avro message from avro schema
        GenericRecord userAvroPayload = createUserAvroPayload();
        GenericRecord movieAvroPayload = createMovieAvroPayload();
        
        //Convert avro message to POJO
        User user = mapRecordToObject(userAvroPayload, new User());
        Movie movie = mapRecordToObject(movieAvroPayload, new Movie());

        System.out.println(movie);
        System.out.println(user);
        
        //Modify POJO
        user.setFirstName("Leo");
        user.setLastName("Messi");
        movie.setMovieName("Totoro");
        movie.setGenre("anime");

        //Convert back to avro message
        GenericRecord genericRecordUser = pojoToRecord(user);
        GenericRecord genericRecordMovie = pojoToRecord(movie);

        System.out.println("****************");
        System.out.println(genericRecordMovie);
        System.out.println(genericRecordUser);
    }

    private static GenericRecord createMovieAvroPayload() throws IOException {

        //Create schema from .avsc file
        Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/movie-v1.avsc").getInputStream());

        //Create avro message with defined schema
        GenericRecord avroMessage = new GenericData.Record(mainSchema);

        //Populate avro message
        avroMessage.put("movie_name", "Casablanca");
        avroMessage.put("genre", "Drama/Romance");

        return avroMessage;
    }

    private static GenericRecord createUserAvroPayload() throws IOException {

        //Create schema from .avsc file
        Schema mainSchema = new Schema.Parser().parse(new ClassPathResource("avro/user-v1.avsc").getInputStream());

        //Create avro message with defined schema
        GenericRecord avroMessage = new GenericData.Record(mainSchema);

        //Populate avro message
        avroMessage.put("first_name", "Karen");
        avroMessage.put("last_name", "Grygoryan");

        return avroMessage;
    }

    private static <T> GenericRecord pojoToRecord(T model) throws IOException {
        Schema schema = ReflectData.get().getSchema(model.getClass());

        ReflectDatumWriter<T> datumWriter = new ReflectDatumWriter<>(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
        datumWriter.write(model, encoder);
        encoder.flush();

        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(outputStream.toByteArray(), null);

        return datumReader.read(null, decoder);
    }

    private static <T> T mapRecordToObject(GenericRecord record, T object) {
        Assert.notNull(record, "record must not be null");
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());

        Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn’t match");
        record.getSchema().getFields().forEach(d -> PropertyAccessorFactory.forDirectFieldAccess(object).setPropertyValue(d.name(), record.get(d.name()) == null ? record.get(d.name()) : record.get(d.name()).toString()));
        return object;
    }
}

There is a lot of going on so let us discuss what we are doing here. First, we will run in terminal

./gradlew clean build 

for gradle-avro-plugin to generate POJO:

Note those POJO’s are auto-generated so don’t try to add/edit it’s method use it “as-is”. Then we will use those POJOs as the base in our conversions(KafkaAvroCustomProducer.java lines 28-29). Method mapRecordToObject() will take POJO and Avro message(GenericRecord) as parameters and will “copy” all values from Avro message to POJO and will return POJO back as result. After that, I modify POJO(KafkaAvroCustomProducer.java lines 35-38) and then I convert my modified POJO to Avro message(KafkaAvroCustomProducer.java lines 41-42). You can “mix & match” those conversion methods as you need and produce required outcome.

Finally, after we run main method we will see print out from console:

{"movie_name": "Casablanca", "genre": "Drama/Romance"}
{"first_name": "Karen", "last_name": "Grygoryan"}
****************
{"movie_name": "Totoro", "genre": "anime"}
{"first_name": "Leo", "last_name": "Messi"}

7 thoughts on “Avro and POJO conversions(tips for Kafka devs)”

  1. Useful utility.
    I have used in my project too.
    But When Object has nested objects like….
    example: Customer pojo has Address pojo.
    How will mapRecordToObject() method work?
    I have been struggling to make this conversion.
    Íf you are aware, can you please help.

  2. Thank you for such good explanation! I recently trying to solve problem – how to deserialize POJO from Avro message if there are multiple various schemas per topic? Avro message can get writer’s schema id, but not reader schema.
    I think like this. On app start register all reader schemas in map schemaName -> POJO. Override default KafkaAvroDeserializer and when schema registry or cache retrieve writer’s schema, then also retrieve reader’s schema from map by schema name, because they must be same.
    So, maybe you are already solved the task. I will be really appreciate any help.

  3. Nice Article. Can you also add an example where avro schema can have nested list of objects. For example, movie can have list of actors and each actor can have firstName, lastName etc

Leave a Reply

Your email address will not be published. Required fields are marked *