Apache Kafka + Spring Cloud Task

Recently I was working on a piece of the project which required to perform a batch job, run application 1 time and once it successfully completed – terminate it. Spring Cloud Task fits perfect for this kind of requirement. Our usual stack of technologies is Spring Cloud Stream/Task and Apache Kafka. In my earlier posts, I showed you an example how to use Spring Cloud Stream + Apache Kafka. However, I never used Spring Cloud Task, so I start browsing for examples and… I found nothing. There is no example of Spring Cloud Task with Apache Kafka. At least I couldn’t find. So I dig into that and come up with a solution.

I wanna start with my requirements:
1. Read all data from Kafka topic
2. Do transformation on this data
3. Push this data to another Kafka topic
4. Terminate application

Here is my solution:

First, let’s spin up our infrastructure – Apache Kafka, in Docker container.
Type in your terminal:

docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 \
      -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 \
      landoop/fast-data-dev:latest

Project structure

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.5.RELEASE'
        confluentVersion = '4.1.0'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.karen'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
    jcenter()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
    maven { url "http://packages.confluent.io/maven/" }
}


ext {
    springCloudTaskVersion = '2.0.0.RELEASE'
}

dependencies {
    implementation('org.springframework.boot:spring-boot-starter-actuator')
    implementation('org.springframework.cloud:spring-cloud-starter-task')
    implementation('org.springframework.kafka:spring-kafka')
    implementation("io.confluent:kafka-avro-serializer:${confluentVersion}") {
        exclude(module: 'slf4j-log4j12')
        exclude(module: 'slf4j-api')
    }
    //in memory db, required by Spring Cloud Task framework, for details pls read Spring docs
    runtime('com.h2database:h2')
    compileOnly('org.springframework.boot:spring-boot-configuration-processor')
    testImplementation('org.springframework.boot:spring-boot-starter-test')
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-task-dependencies:${springCloudTaskVersion}"
    }
}

AppConfig.java

package com.karen.cloudtask.configuration;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
@EnableTask
public class AppConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<GenericRecord, GenericRecord>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //setting number of concurrent threads which will read from kafka topic, 
        //basically it's a number of consumers in consumer group
        //it has concurrency 5, because number of partition in Kafka topic is 5, 1 thread per partition.
        factory.setConcurrency(5);
        //setting number of ms when to emit "notification" if kafka consumer is idle, 
        //e.g. no more messages in kafka topic
        factory.getContainerProperties().setIdleEventInterval(3000L);
        return factory;
    }

    @Bean
    public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put("schema.registry.url", "http://127.0.0.1:8081");
        return props;
    }

    @Bean
    public ProducerFactory<GenericRecord, GenericRecord> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        props.put("schema.registry.url", "http://127.0.0.1:8081");
        return props;
    }

    @Bean
    public KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

CloudTaskRunner.java

package com.karen.cloudtask.runner;

import org.apache.avro.generic.GenericRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.Set;

@Component
public class CloudTaskRunner {

    private final String LISTENER_ID = "myid";// consumer group id
    private final String LISTENER_ID_DASH = LISTENER_ID + "-";
    private Set<String> consumerIds = new HashSet<>();

    private final KafkaListenerEndpointRegistry registry;
    private KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;


    @Autowired
    public CloudTaskRunner(KafkaListenerEndpointRegistry registry, KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate) {
        this.registry = registry;
        this.kafkaTemplate = kafkaTemplate;
    }

    // setting my groud id, and Kafka topic - "reddit_posts", it's created by default when you spin up Kafka Docker image.
    @KafkaListener(id = LISTENER_ID, topics = "reddit_posts")
    public void receive(GenericRecord payload) {
        //transformation on message, GenericRecord is an avro message type.
        payload.put("body", payload.get("body").toString().toUpperCase());
        //send data to next kafka topic
        kafkaTemplate.send("reddit_posts_modified", payload);
    }
    
    //Listen for the events, when EACH Consumer will be idle for 3 seconds it will emit event
    //"condition" is our filter, which will accept only events which starts with consumer group id, in our case it's a "myid"
    @EventListener(condition = "event.listenerId.startsWith('" + LISTENER_ID_DASH + "')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        countEventsAndDeregister(event);
    }

    //Note that this method is synchronized, because events are emmited from 5 consumers, 5 different threads..
    private synchronized void countEventsAndDeregister(ListenerContainerIdleEvent event) {
        //check if all consumers in our hashset
        //if yes, all consumers finished reading all messages
        //and we can terminate application
        if (consumerIds.size() > 4) {
            kafkaTemplate.flush();
            //terminating application
            registry.stop();
        }
        //add only unique consumer id to hashset.
        //remember we need 5 unique ids.
        //why we do it this way?
        //Each consumer/thread will emit continuosly every 3 sec event when it's idle
        //so we don't want to stop application because one of thread finished it's job faster and emited 5 events.
        // in hashset we keep only unique id's of consumers/threads
        consumerIds.add(event.getListenerId());
    }

}

CloudtaskApplication.java

package com.karen.cloudtask;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

//Nothing interesting here, boilerplate code
@SpringBootApplication
public class CloudtaskApplication {

    public static void main(String[] args) {
        SpringApplication.run(CloudtaskApplication.class, args);
    }
}

Let us run our application:


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.5.RELEASE)

2018-10-19 16:49:09.397  INFO 81235 --- [           main] c.karen.cloudtask.CloudtaskApplication   : Starting CloudtaskApplication
....

2018-10-19 16:49:16.668  INFO 81235 --- [     myid-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
2018-10-19 16:49:16.668  INFO 81235 --- [     myid-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825
2018-10-19 16:49:20.043  INFO 81235 --- [     myid-0-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2018-10-19 16:49:20.043  INFO 81235 --- [     myid-2-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2018-10-19 16:49:20.043  INFO 81235 --- [     myid-4-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2018-10-19 16:49:20.043  INFO 81235 --- [     myid-3-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2018-10-19 16:49:20.055  INFO 81235 --- [     myid-3-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
2018-10-19 16:49:20.055  INFO 81235 --- [     myid-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
2018-10-19 16:49:20.057  INFO 81235 --- [     myid-4-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
2018-10-19 16:49:20.058  INFO 81235 --- [     myid-2-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
2018-10-19 16:49:30.047  INFO 81235 --- [     myid-1-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2018-10-19 16:49:30.051  INFO 81235 --- [     myid-1-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
2018-10-19 16:49:30.051  INFO 81235 --- [       Thread-8] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@23986957: startup date [Fri Oct 19 16:49:09 PDT 2018]; root of context hierarchy
2018-10-19 16:49:30.053  INFO 81235 --- [       Thread-8] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483547
2018-10-19 16:49:30.053  INFO 81235 --- [       Thread-8] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0
2018-10-19 16:49:30.055  INFO 81235 --- [       Thread-8] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2018-10-19 16:49:30.056  INFO 81235 --- [       Thread-8] o.s.j.d.e.EmbeddedDatabaseFactory        : Shutting down embedded database: url='jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=false'
2018-10-19 16:49:30.057  INFO 81235 --- [       Thread-8] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.

Process finished with exit code 0

As you can see the application run and after a job was completed it has terminated itself. If you will go to http://127.0.0.1:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/reddit_posts_modified/ you will see our new topic with transformed data.

Leave a Reply

Your email address will not be published. Required fields are marked *