In this blog, we would see, how can we leverage the Kafka-Streams along with Spring-Boot.
Step #1.) Let’s first start our confluent services at our local :-
Step #2.) Now, we can open the control-centre and create a new topic named : “test-topic” :-
Step #3.) Now, we can now start producing the data into Kafka-topic :-
Step #4.) Now, we can now start consuming the data from Kafka-topic :-
Step #5.) Next, we begin with out streams application. Let’s first create the properties file :-
- The first one is known as the input/output channel binding. The input/output channel binding defines the list of sources and the destinations. We are defining only one channel. The name of the channel is “input-channel-1”, and the destination is the “users”. This means that, In this configuration, We shall connect to “users” Kafka topic using the “input-channel-1”. Because we wanted to connect to the “users” Kafka topic and read all the messages.
- The second one is known as the binder. The binder will define our source and destination technology.
Spring Cloud offers us a bunch of binder technologies. We can use Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub, Azure Event Hubs, and many more. In this blog, we shall focus on Apache Kafka. Spring cloud offers us two types of Kafka Binders.
- Apache Kafka Binder → It implements the Kafka Client APIs.
- Kafka Streams Binder → It is explicitly designed for Kafka Streams API.
In this course, we will be focusing on Kafka Streams Binder. We are setting Kafka broker hostname and port so the application can connect to the Kafka Cluster. We are also setting the message key and value types. Since, we are expecting a string Key and a string value, so we are setting StringSerde for both. StringSerde is the type of message key and value for now.
Step #6.) Next, we begin to create an Interface to define the input/output bindings. We have named it as KafkaListenerBinding. The primary purpose of this interface is to define input and output bindings. We want to listen to an input topic. The input channel(defined in properties) must be picked up by the Spring Cloud framework and bind to the “users” Kafka topic. So, we defined a binding interface and linked the input channel using the @Input annotation.
- This method will read from a Kafka topic and return a KStream. The KStream is a Kafka message stream made up of a string key and a String value, here in our case.
- We also need to annotate this method using @Input annotation and pass in the input channel name. That’s all.
Here is current-version of Spring-cloud being defined with us :-
Just to avoid the deprecation-warning, let’s lower our spring-cloud version to a stable and industry-adopted version i.e. Hoxton.SR9.
Step #7.) Next step, we move on to the KafkaListenerService. The @EnableBinding will trigger the Spring Cloud framework. Now, the Spring Cloud framework will implement the Binding interface and create a Kafka Stream. You can listen to the stream using a listener method. So, listener method will receive the input stream, and I am sending it to the log.
- We want to bind this class with the Spring-Cloud-Stream infrastructure and for the same purpose, we are using @EnableBinding annotation. Into this, we pass in our “binder-interface-name”. This class will trigger the Spring-Cloud-Stream framework and connect to the Kafka input channel using the Kafka Streams API and start consuming the input messages as a KStream.
- The next step is to process each message from the KStream. This method is @StreamListener for the given input channel. The Spring Cloud framework will call this method and pass in the KStream. Now, we can do whatever we want to do with this KStream.
- In this use-case, we would simply log the key-value-message thus being received. So, we shall use the foreach() method on the input stream which takes in key/value lambda, and log the key and the value.
That’s all in this blog. We shall see you in next post with some more details.