In this post, I will explain how you can solve an issue with serialization/deserialization of Spring beans and Kafka in Spark application.
As an example, I found a similar question on StackOverflow.
Our Spark app code:
@Component public class SparkApplication implements ApplicationRunner { private KafkaConfigurationProperties kafkaConfProps; @Autowired public SparkApplication(KafkaConfigurationProperties kafkaConfProps){ this.kafkaConfProps = kafkaConfProps; } @Override public void run(ApplicationArguments args) throws Exception { Dataset<Row> dataset = ...; //Spark data dataset.foreachPartition( partition -> { /* * Say we want to use kafkaConfProps to create our kafkaProducer and send some payload */ KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps.getKafkaProducerProperties()); kafkaProducer.send(...); }); } }
Code for KafkaConfigurationProperties class.
@Configuration //pls note that "kafka-properties" is a property in your application.yml or application.properties file @ConfigurationProperties(prefix = "kafka-properties") public class KafkaConfigurationProperties{ private String kafkaBrokers; private String keySerializer; private String valueSerializer; private String producerTopic; public String getKafkaBrokers() { return kafkaBrokers; } public void setKafkaBrokers(String kafkaBrokers) { this.kafkaBrokers = kafkaBrokers; } public String getKeySerializer() { return keySerializer; } public void setKeySerializer(String keySerializer) { this.keySerializer = keySerializer; } public String getValueSerializer() { return valueSerializer; } public void setValueSerializer(String valueSerializer) { this.valueSerializer = valueSerializer; } public void setProducerTopic(String producerTopic) { this.producerTopic = producerTopic; } public String getProducerTopic() { return producerTopic; } }
Application.yml file.
... kafka-properties: kafka-brokers: kafka-broker.com:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringDeserializer producer-topic: kafka-topic ...
If you will try to run this code you will have KafkaConfigurationProperties serialization exception, because Spark driver node trying to transfer your spring bean to executor nodes and your Spring bean is not serializable. Your first attempt to fix that will be to implement Serialization interface, but that won’t fix exception. What you will have to do, or at least the best solution I found to this problem, is to create a wrapper class which implements Serializable interface, which will basically reassign values from Spring bean to his own variables.
public class KafkaConfigurationPropertiesWrapper implements Serializable { private static final long serialVersionUID = 777L; private String kafkaBrokers; private String keySerializer; private String valueSerializer; private String producerTopic; private Properties kafkaProducerProperties; public KafkaConfigurationPropertiesWrapper(KafkaConfigurationProperties conf) { this.kafkaBrokers = conf.getKafkaBrokers(); this.keySerializer = conf.getKeySerializer(); this.valueSerializer = conf.getValueSerializer(); this.producerTopic = conf.getProducerTopic(); } public KafkaConfigurationPropertiesWrapper() {} public String getKafkaBrokers() { return kafkaBrokers; } public void setKafkaBrokers(String kafkaBrokers) { this.kafkaBrokers = kafkaBrokers; } public String getKeySerializer() { return keySerializer; } public void setKeySerializer(String keySerializer) { this.keySerializer = keySerializer; } public String getValueSerializer() { return valueSerializer; } public void setValueSerializer(String valueSerializer) { this.valueSerializer = valueSerializer; } public String getProducerTopic() { return producerTopic; } public void setProducerTopic(String producerTopic) { this.producerTopic = producerTopic; } public Properties getKafkaProducerProperties() { kafkaProducerProperties = new Properties(); kafkaProducerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaBrokers); kafkaProducerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer); kafkaProducerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer); return kafkaProducerProperties; } public void setKafkaProducerProperties(Properties kafkaProducerProperties) { this.kafkaProducerProperties = kafkaProducerProperties; } }
Here is modified SparkApplication class which uses a wrapper class.
@Component public class SparkApplication implements ApplicationRunner { private KafkaConfigurationProperties kafkaConfProps; @Autowired public SparkApplication(KafkaConfigurationProperties kafkaConfProps) { this.kafkaConfProps = kafkaConfProps; } @Override public void run(ApplicationArguments args) throws Exception { KafkaConfigurationPropertiesWrapper kafkaPropsWrapper = new KafkaConfigurationPropertiesWrapper(kafkaConfProps); Dataset<Row> dataset = ...; //Spark data dataset.foreachPartition(partition -> { KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaPropsWrapper.getKafkaProducerProperties()); kafkaProducer.send(...); }); } }
In this example, I used properties class for simplicity, but you can create wrapper classes for any of your Spring beans in Spark Application.