Hi there! Recently Spring Cloud Stream 2.0 introduced a new feature – polled consumers(PollableMessageSource), where the application can control the reading rate from a source (Kafka, RabbitMQ), basically you can pause your stream. It is especially helpful in the case of Kafka. Before the new feature, you will just read continuously payload from the topic as much as it has, non-stop. What if you need to pause your stream? Say we are getting messages from Kafka topic and then we are sending data to some external service and at some point, external service becomes unavailable.
For this use case, I created an application, that deals with such an issue.
Project structure:
DemoApplication.class
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling //Enables Spring's scheduled task execution capability public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
CustomProcessor.class
package com.example.demo; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.binder.PollableMessageSource; public interface CustomProcessor { String POLLABLE_INPUT = "pollableInput"; //Define our own message source, from where we are going to pull/poll our messages. //Pls note that it is an "Input" type, e.g. read only, no writes to this "channel" @Input(CustomProcessor.POLLABLE_INPUT) PollableMessageSource pollableInput(); }
ExternalService.class
package com.example.demo; import org.springframework.stereotype.Service; import java.util.concurrent.ThreadLocalRandom; @Service //Our fake service public class ExternalService { public boolean isHealthy() { //Imagine we are checking health of external web service return ThreadLocalRandom.current().nextBoolean(); } public void enrichData(String message) { //Sending data to external service } }
PollableMessageSourceRunner.class
package com.example.demo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component @EnableBinding({CustomProcessor.class})//Here we bind our custom channel public class PollableMessageSourceRunner { private CustomProcessor customProcessor; private ExternalService externalService; @Autowired //Always, always, always autowire via constructor public PollableMessageSourceRunner(CustomProcessor customProcessor, ExternalService externalService) { this.customProcessor = customProcessor; this.externalService = externalService; } //Every 1000ms this method will be executed @Scheduled(fixedDelayString = "1000") public void pollMessages() { boolean hasMessage = true; //checking if service is healthy boolean healthy = externalService.isHealthy(); if (healthy) {//execute method if our fake service is healthy //.poll() method returns boolean, //e.g. if kafka topic contained message it will return 'true' //and it will execute method inside .poll() while (hasMessage) { //we are pulling message by message from the topic, //if topic empty we stop hasMessage = customProcessor.pollableInput().poll( message -> { //send our message to external service externalService.enrichData(message.getPayload().toString()); } ); } } } }
application.yml
spring: cloud: stream: bindings: pollableInput: #name of our custom channel group: pollable-group #kafka consumer group destination: kafka-topic #setting kafka topic to read from binder: brokers: kafka-1.com:9092, kafka-2.com:9092, kafka-3.com:9092 zkNodes: kafka-1.com:2181, kafka-2.com:2181, kafka-3.com:2181 kafka: consumer: value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
build.gradle
buildscript { ext { springBootVersion = '2.0.3.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() jcenter() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } ext { springCloudVersion = 'Finchley.RELEASE' } dependencies { compile('org.springframework.boot:spring-boot-starter-actuator') compile('org.springframework.cloud:spring-cloud-stream') compile('org.springframework.cloud:spring-cloud-stream-binder-kafka') compile('org.springframework.kafka:spring-kafka') compileOnly('org.springframework.boot:spring-boot-configuration-processor') testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('org.springframework.cloud:spring-cloud-stream-test-support') } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}" } }
Can you please help me what should I do if I want to enable batch-mode and read multiple message at once?
Hi Sohan,
AFAIK Spring Cloud Stream will support batch processing from version 3, right now only single messages.
What happens if externalService.enrichData(message.getPayload().toString()); in polling message throws exception?