Spring Cloud Stream + Apache Kafka(PollableMessageSource)

Hi there! Recently Spring Cloud Stream 2.0 introduced a new feature – polled consumers(PollableMessageSource), where the application can control the reading rate from a source (Kafka, RabbitMQ), basically you can pause your stream. It is especially helpful in the case of Kafka. Before the new feature, you will just read continuously payload from the topic as much as it has, non-stop. What if you need to pause your stream? Say we are getting messages from Kafka topic and then we are sending data to some external service and at some point, external service becomes unavailable.
For this use case, I created an application, that deals with such an issue.

Project structure:

DemoApplication.class

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling //Enables Spring's scheduled task execution capability
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

CustomProcessor.class

package com.example.demo;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.binder.PollableMessageSource;

public interface CustomProcessor {
    String POLLABLE_INPUT = "pollableInput";
    //Define our own message source, from where we are going to pull/poll our messages.
    //Pls note that it is an "Input" type, e.g. read only, no writes to this "channel"
    @Input(CustomProcessor.POLLABLE_INPUT)
    PollableMessageSource pollableInput();
}

ExternalService.class

package com.example.demo;

import org.springframework.stereotype.Service;
import java.util.concurrent.ThreadLocalRandom;

@Service
//Our fake service
public class ExternalService {

    public boolean isHealthy() {
        //Imagine we are checking health of external web service
        return ThreadLocalRandom.current().nextBoolean();
    }

    public void enrichData(String message) {
        //Sending data to external service
    }
}

PollableMessageSourceRunner.class

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@EnableBinding({CustomProcessor.class})//Here we bind our custom channel
public class PollableMessageSourceRunner {

    private CustomProcessor customProcessor;
    private ExternalService externalService;

    @Autowired //Always, always, always autowire via constructor
    public PollableMessageSourceRunner(CustomProcessor customProcessor, ExternalService externalService) {
        this.customProcessor = customProcessor;
        this.externalService = externalService;
    }

    //Every 1000ms this method will be executed
    @Scheduled(fixedDelayString = "1000")
    public void pollMessages() {
        boolean hasMessage = true;
        //checking if service is healthy
        boolean healthy = externalService.isHealthy();

        if (healthy) {//execute method if our fake service is healthy
            //.poll() method returns boolean, 
            //e.g. if kafka topic contained message it will return 'true'
            //and it will execute method inside .poll()
            while (hasMessage) {
                //we are pulling message by message from the topic, 
                //if topic empty we stop
                hasMessage = customProcessor.pollableInput().poll(
                        message -> {
                            //send our message to external service
                            externalService.enrichData(message.getPayload().toString());
                        }
                );
            }
        }
    }
}

application.yml

spring:
  cloud:
    stream:
      bindings:
        pollableInput: #name of our custom channel 
          group: pollable-group #kafka consumer group
          destination: kafka-topic #setting kafka topic to read from
        binder:
          brokers: kafka-1.com:9092, kafka-2.com:9092, kafka-3.com:9092
          zkNodes: kafka-1.com:2181, kafka-2.com:2181, kafka-3.com:2181
  kafka:
    consumer:
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.3.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
    jcenter()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


ext {
    springCloudVersion = 'Finchley.RELEASE'
}

dependencies {
    compile('org.springframework.boot:spring-boot-starter-actuator')
    compile('org.springframework.cloud:spring-cloud-stream')
    compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
    compile('org.springframework.kafka:spring-kafka')
    compileOnly('org.springframework.boot:spring-boot-configuration-processor')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('org.springframework.cloud:spring-cloud-stream-test-support')

}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

3 thoughts on “Spring Cloud Stream + Apache Kafka(PollableMessageSource)”

  1. Can you please help me what should I do if I want to enable batch-mode and read multiple message at once?

    1. Hi Sohan,
      AFAIK Spring Cloud Stream will support batch processing from version 3, right now only single messages.

  2. What happens if externalService.enrichData(message.getPayload().toString()); in polling message throws exception?

Leave a Reply

Your email address will not be published. Required fields are marked *