Recently I was working on a piece of the project which required to perform a batch job, run application 1 time and once it successfully completed – terminate it. Spring Cloud Task fits perfect for this kind of requirement. Our usual stack of technologies is Spring Cloud Stream/Task and Apache Kafka. In my earlier posts, I showed you an example how to use Spring Cloud Stream + Apache Kafka. However, I never used Spring Cloud Task, so I start browsing for examples and… I found nothing. There is no example of Spring Cloud Task with Apache Kafka. At least I couldn’t find. So I dig into that and come up with a solution.
I wanna start with my requirements:
1. Read all data from Kafka topic
2. Do transformation on this data
3. Push this data to another Kafka topic
4. Terminate application
Here is my solution:
First, let’s spin up our infrastructure – Apache Kafka, in Docker container.
Type in your terminal:
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 \ -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 \ landoop/fast-data-dev:latest
Project structure
build.gradle
buildscript { ext { springBootVersion = '2.0.5.RELEASE' confluentVersion = '4.1.0' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'com.karen' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() jcenter() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } maven { url "http://packages.confluent.io/maven/" } } ext { springCloudTaskVersion = '2.0.0.RELEASE' } dependencies { implementation('org.springframework.boot:spring-boot-starter-actuator') implementation('org.springframework.cloud:spring-cloud-starter-task') implementation('org.springframework.kafka:spring-kafka') implementation("io.confluent:kafka-avro-serializer:${confluentVersion}") { exclude(module: 'slf4j-log4j12') exclude(module: 'slf4j-api') } //in memory db, required by Spring Cloud Task framework, for details pls read Spring docs runtime('com.h2database:h2') compileOnly('org.springframework.boot:spring-boot-configuration-processor') testImplementation('org.springframework.boot:spring-boot-starter-test') } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-task-dependencies:${springCloudTaskVersion}" } }
AppConfig.java
package com.karen.cloudtask.configuration; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.cloud.task.configuration.EnableTask; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka @EnableTask public class AppConfig { @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<GenericRecord, GenericRecord>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //setting number of concurrent threads which will read from kafka topic, //basically it's a number of consumers in consumer group //it has concurrency 5, because number of partition in Kafka topic is 5, 1 thread per partition. factory.setConcurrency(5); //setting number of ms when to emit "notification" if kafka consumer is idle, //e.g. no more messages in kafka topic factory.getContainerProperties().setIdleEventInterval(3000L); return factory; } @Bean public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); props.put("schema.registry.url", "http://127.0.0.1:8081"); return props; } @Bean public ProducerFactory<GenericRecord, GenericRecord> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); props.put("schema.registry.url", "http://127.0.0.1:8081"); return props; } @Bean public KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
CloudTaskRunner.java
package com.karen.cloudtask.runner; import org.apache.avro.generic.GenericRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.event.ListenerContainerIdleEvent; import org.springframework.stereotype.Component; import java.util.HashSet; import java.util.Set; @Component public class CloudTaskRunner { private final String LISTENER_ID = "myid";// consumer group id private final String LISTENER_ID_DASH = LISTENER_ID + "-"; private Set<String> consumerIds = new HashSet<>(); private final KafkaListenerEndpointRegistry registry; private KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate; @Autowired public CloudTaskRunner(KafkaListenerEndpointRegistry registry, KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate) { this.registry = registry; this.kafkaTemplate = kafkaTemplate; } // setting my groud id, and Kafka topic - "reddit_posts", it's created by default when you spin up Kafka Docker image. @KafkaListener(id = LISTENER_ID, topics = "reddit_posts") public void receive(GenericRecord payload) { //transformation on message, GenericRecord is an avro message type. payload.put("body", payload.get("body").toString().toUpperCase()); //send data to next kafka topic kafkaTemplate.send("reddit_posts_modified", payload); } //Listen for the events, when EACH Consumer will be idle for 3 seconds it will emit event //"condition" is our filter, which will accept only events which starts with consumer group id, in our case it's a "myid" @EventListener(condition = "event.listenerId.startsWith('" + LISTENER_ID_DASH + "')") public void eventHandler(ListenerContainerIdleEvent event) { countEventsAndDeregister(event); } //Note that this method is synchronized, because events are emmited from 5 consumers, 5 different threads.. private synchronized void countEventsAndDeregister(ListenerContainerIdleEvent event) { //check if all consumers in our hashset //if yes, all consumers finished reading all messages //and we can terminate application if (consumerIds.size() > 4) { kafkaTemplate.flush(); //terminating application registry.stop(); } //add only unique consumer id to hashset. //remember we need 5 unique ids. //why we do it this way? //Each consumer/thread will emit continuosly every 3 sec event when it's idle //so we don't want to stop application because one of thread finished it's job faster and emited 5 events. // in hashset we keep only unique id's of consumers/threads consumerIds.add(event.getListenerId()); } }
CloudtaskApplication.java
package com.karen.cloudtask; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; //Nothing interesting here, boilerplate code @SpringBootApplication public class CloudtaskApplication { public static void main(String[] args) { SpringApplication.run(CloudtaskApplication.class, args); } }
Let us run our application:
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.0.5.RELEASE) 2018-10-19 16:49:09.397 INFO 81235 --- [ main] c.karen.cloudtask.CloudtaskApplication : Starting CloudtaskApplication .... 2018-10-19 16:49:16.668 INFO 81235 --- [ myid-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.2 2018-10-19 16:49:16.668 INFO 81235 --- [ myid-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 2a121f7b1d402825 2018-10-19 16:49:20.043 INFO 81235 --- [ myid-0-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 2018-10-19 16:49:20.043 INFO 81235 --- [ myid-2-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 2018-10-19 16:49:20.043 INFO 81235 --- [ myid-4-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 2018-10-19 16:49:20.043 INFO 81235 --- [ myid-3-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 2018-10-19 16:49:20.055 INFO 81235 --- [ myid-3-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped 2018-10-19 16:49:20.055 INFO 81235 --- [ myid-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped 2018-10-19 16:49:20.057 INFO 81235 --- [ myid-4-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped 2018-10-19 16:49:20.058 INFO 81235 --- [ myid-2-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped 2018-10-19 16:49:30.047 INFO 81235 --- [ myid-1-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 2018-10-19 16:49:30.051 INFO 81235 --- [ myid-1-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped 2018-10-19 16:49:30.051 INFO 81235 --- [ Thread-8] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@23986957: startup date [Fri Oct 19 16:49:09 PDT 2018]; root of context hierarchy 2018-10-19 16:49:30.053 INFO 81235 --- [ Thread-8] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483547 2018-10-19 16:49:30.053 INFO 81235 --- [ Thread-8] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0 2018-10-19 16:49:30.055 INFO 81235 --- [ Thread-8] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown 2018-10-19 16:49:30.056 INFO 81235 --- [ Thread-8] o.s.j.d.e.EmbeddedDatabaseFactory : Shutting down embedded database: url='jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=false' 2018-10-19 16:49:30.057 INFO 81235 --- [ Thread-8] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms. Process finished with exit code 0
As you can see the application run and after a job was completed it has terminated itself. If you will go to http://127.0.0.1:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/reddit_posts_modified/ you will see our new topic with transformed data.