System Design with Kafka Streams

Kafka Streams Fundamentals :-

Kafka streams are built on top of Kafka client APIs. Kafka streams leverages the native capability of Kafka to offer data parallelism and distributed coordination and fault-tolerance. By default, the streams application runs as a single threaded application.

Now, if we want to switch on the multi-threaded way (within 1 single instance), we can set this property and accordingly, the streaming application becomes multi-threaded :-

props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);

With multi-threads running on a single machine, each thread executes 1 or more stream tasks. With this, degree of parallelism becomes 3. This is one way of scaling our application vertically, but here, the no of threads are limited by the resources of the single machine and hence we have another way of achieving the scaling i.e. by having more machines.

Quest: Whats a stream-task and how does the multiple tasks share the load ?

Answer:

  • We define the computational logic(in Kafka streaming application) using the processor APIs which gets bundled into a Topology. Topology is a unit of execution.
  • At runtime, the kafka framework shall create the logical streaming tasks and each of these streams-tasks shall copy the inherent topology object and execute it independently in parallel.

Example :- Lets say, we have 2 Kafka topics and a single machine powered streaming application. Also, lets assume, each kafka topic has 3 partitions in it, then :-

  • Kafka framework shall create fixed no. of logical tasks on this single machine.
  • The no. of tasks is equal to the no. of partitions in the input topic. In case, there are more than 2 input topics, then no of tasks is equal to the largest no of partitions amongst the input topic. Thus, in our scenario, 3 tasks shall be created.
  • Next Kafka framework assigns the partitions to these tasks evenly i.e. 1 partition from each topic to each task.
  • Now, the initial assignment of the partitions to the tasks never changes and hence, no of tasks are fixed and therefore its the maximum degree of parallelism for the application. This are all Logical tasks.
  • At this stage, these 3 tasks are ready to be assigned to the application-threads OR Instances. Lets further assume that, we started our application on 2 machines, each having a single thread. Then, framework shall assign 1 task to each instance and the remaining 3rd task shall be assigned randomly to any one of the instance.
  • Say we now want to scale up this application with another instance i.e. a altogether new machine, then kafka framework shall automatically migrate to the new instance under the hoods, without user / developer being even aware about this stuff.
  • Say we further want to add few more instances, then those new instances shall remain idle because degree of parallelism is directly dependent upon the no of partitions and hence we don’t have any new task to assign to them.
  • Fault-tolerance:- Say if an active instance-3 (on which kafka-streaming-task-3 is runninng) goes down :-

In this case, again automatic rebalancing shall be trigger-red under the hoods for the kafka-stream-tasks and framework would automatically assign the 3rd task to remaining instances like this :-

Thus, bottom-line is: Increasing the no of threads may provide us higher parallelism, but for getting fault-tolerance we must use multiple machines, so that if one instance/machine goes down, then our stream-tasks can easily move to another functional machine. The good thing is, this handling is absolutely transparent to the end-user and is taken care by the framework itself !!

Alternatives to implement Streams :-

a.) Streams DSL

b.) Processor APIs.

Details for Streams DSL :-

// CREATE CONFIGURATIONSProperties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// CREATE STREAMS TOPOLOGY
StreamsBuilder streamsBuilder = new StreamsBuilder();KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);kStream.foreach((k, v) -> System.out.println(“Key= “ + k + “ Value= “ + v));//kStream.peek((k,v)-> System.out.println(“Key= “ + k + “ Value= “ + v));Topology topology = streamsBuilder.build();
// START THE STREAMS
KafkaStreams streams = new KafkaStreams(topology, props);
logger.info(“Starting stream.”);
streams.start();

// SHUTDOWN HOOK
Runtime.getRuntime().addShutdownHook(new Thread(() -> {logger.info(“Shutting down stream”);
streams.close();
}));

Every Kafka streams application starts from subscribing to some kafka topic because we need to consume our stream to begin processing. Next, we create our topology by building upon the source processor. Below code snippet creates a simple processor :-

KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);

A stream-processor internally implements a kafka consumer to consume messages from one or more kafka topics. As demonstrated in above snippet, this produces a KStream object which shall be used by its downstream processor as an input stream. Once we have our source processor & streams object ready, we can add more processors to our topology. Adding a new processor node, is as simple as calling a simple transformation method on the kstreams object as received from the previous step. Below code snippet adds only ONE processor to the topology :-

kStream.foreach((k, v) -> System.out.println(“Key= “ + k + “ Value= “ + v));

KSTREAMS :- Its an abstraction on stream of kafka message records. Following transformations are supported on the streams :-

  • filter() — Returns KStreams
  • map() — Returns KStreams
  • flatmap() — Returns KStreams
  • to() — Returns Void — Terminating / Sink Processor.
  • foreach() — Returns Void — Terminating / Sink Processor.

We can then perform the chain of processors on the KStreams object. Lets take a real life example where we shall demonstrate the implementation of the problem statement :-

We have multiple POS terminals which keeps on sending the MessageRecord for every Invoice (being generated) to the Kafka cluster of brokers. We want following things :-

a.) For all ‘HOME-DELIVERY’ type of the invoices, we want to push them to another topic say ‘Shipment-Service’ topic.

b.) For all ‘PRIME’ type of the invoices, we want to reformat the MessageRecord to ‘Notification’ type of object and then shall push them to another topic ‘Loyalty-Management-Service’ topic. The DAG graph for this requirement shall look something like this :-

c.) For all the Invoices, we need to mask the personal information and then reformat MessageRecord to ‘HadoopRecord’ type and send it to another topic which shall be used to dump the data into Hadoop storage system for performing Trend Analytics and Batch Analytics later.

Lets create the ‘Directed Acyclic Graph’ for this requirement i.e. Modelling the data-pipeline :-

Lets have it coded now :- The first step is to build a Topology(i.e. unit of execution) for this problem requirement.

i.) Creating a Stream Builder.

ii.) Create a new KStream using the aforesaid builder. This shall be our source stream. We can use: builder.stream() method to create this stream object. We shall pass on 2 parameters to manufacture this source stream object. First, the topicName and second, the specific SERDES for reading & writing from and to the topic respectively. In this case, since we are consuming from the parent topic, the snippet goes like this :-

StreamsBuilder builder = new StreamsBuilder();
KStream<String, PosInvoice> KS0 = builder.stream(AppConfigs.posTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));

iii.) Next, for requirement no. 1, we filter out those records whose delivery type is ‘HOME-DELIVERY’ and then push all of those specific MessageRecords to the new topic.

  • For filtering, we pass lambda-expression of type Predicate<K,V> as an argument to KStream’s method filter. Basically, here we are supplying the definition of the logic to filter-out unwanted records.
  • Post filtering, we shall be using the ‘to’ method which internally creates an producer and push the record to the desired topic. The ‘to’ method is actually a SINK processor where-in, we pass-in 2 arguments i.e. first as desired topicName(where the messageRecord has to be pushed) and second the SERDES class with which we want to serialize the object.
KS0
.filter((k, v) -> v.getDeliveryType().equalsIgnoreCase(
AppConfigs.DELIVERY_TYPE_HOME_DELIVERY))
.to(AppConfigs.shipmentTopicName, Produced.with(AppSerdes.String(), AppSerdes.PosInvoice()));

iv.) Next, for requirement no. 2, we filter out those records whose delivery type is ‘PRIME’, convert them to the requisite format and then push all of those new MessageRecords to the new topic.

  • For filtering, we pass lambda-expression of type Predicate<K,V> as an argument to KStream’s method filter. Basically, here we are supplying the definition of the logic to filter-out unwanted records.
  • Post filtering, we need to convert these ‘POS’ type of MessageRecords to the ‘Notification’ type of MessageRecords.
  • Next, we shall be using the ‘to’ method which internally creates an producer and push the record to the desired topic. The ‘to’ method is actually a SINK processor where-in, we pass-in 2 arguments i.e. first as desired topicName(where the messageRecord has to be pushed) and second the SERDES class with which we want to serialize the object.
KS0
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(
AppConfigs.CUSTOMER_TYPE_PRIME))
.mapValues(invoice -> RecordMaker.getNotificationTypeRecordFromInvoiceTypeRecord(invoice))
.to(AppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));

v.) We do the exactly same steps for requirement no. 3.

KS0
.mapValues(invoice -> RecordBuilder.getMaskedInvoice(invoice))
.flatMapValues(invoice -> RecordBuilder.getHadoopRecords(invoice))
.to(AppConfigs.hadoopTopic, Produced.with(AppSerdes.String(), AppSerdes.HadoopRecord()));

vi.) And the last step, we now start the KafkaStreams along with shutdown hook :-

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Lets say, there is a enhancement in requirement no. 2 above. We now want following things as well :- For all ‘PRIME’ type of the invoices, we want to reformat the MessageRecord to ‘Notification’ type first and then also calculate the previously cumulated reward points and then push this new MessageRecord to another topic ‘Loyalty-Management-Service’ topic. The DAG for this requirement would look something like this :-

  • First we create a source processor to read the stream of invoices i.e. KS0.
  • Then, from KS0 stream, we filter out only PRIME customer type of Invoices. This new stream is called as KS3. At this stage, the processor involved is a Stateless processor.
  • At this step, we compute the reward-points gained by customer for this Invoice and include it in ‘Notification’ MessageRecord. Then we apply mapValues() processor to map the Invoice type of MessageRecords to the Notification type of MessageRecords. Lets call this new stream of ‘Notification’ type of MessageRecords as KS4.
  • Now, we want to add the previously earned points to the points earned through this Invoice. We can achieve this by having a new processor which shall query to some lookup-table (for knowing the net total no. of points earned by this customer so far) and adding it to the current reward points and then updating the ‘Notification’ MessageRecord object and then writing back the updated value of reward points back to the same lookup-table for next query. Please note here that, at this stage, the processor involved is a Stateful processor, because the processor of KS4 stream needs to know even about the previous information as well.
  • Finally, we can send the updated ‘Notification’ MessageRecord object to the another topic for getting picked by SMS Service.

Now, Please note that, such an lookup-table as demonstrated above is known as State in the Real time streaming applications. Its a very fundamental concept to any application. We can implement these states using following types of stores :-

  • Non-Persistent / In-memory store → Presents risk of being washed away in case of crash.
  • Persistent / Remote-databases(like PostGres, Cassandra, Aerospike, etc.) → Presents performance bottleneck due to network calls involved.

Kafka out of the box, also presents us the 2 options for maintaining the stores :-

  • Fault-tolerant In-memory state stores.
  • Recoverable persistent state stores.

At Kafka’s heart, we are now in position to define the Types of Processors :-

  • Stateless processors → Stateless transformations doesn’t require the states and hence doesn’t even needs the lookup-table. E.g. Processors such as mapValues(), filter, flatMapValues(), etc.
  • Stateful processors → Stateful transformations do require the states and even needs the lookup-table(i.e. State store) for computing the output as well. E.g. Processors such as Aggregators, Joining, Windowing, etc.

Solution to enhanced requirement :-

Lets have it coded now :- The first step is to build a Topology(i.e. unit of execution) for this problem requirement.

i.) Creating a Stream Builder.

ii.) Create a new KStream using the aforesaid builder. This shall be our source stream. We can use: builder.stream() method to create this stream object. We shall pass on 2 parameters to manufacture this source stream object. First, the topicName and second, the specific SERDES for reading & writing from and to the topic respectively. In this case, since we are consuming from the parent topic, the snippet goes like this :-

StreamsBuilder builder = new StreamsBuilder();
KStream<String, PosInvoice> KS0 = builder.stream(AppConfigs.posTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));

iii.) Next, for this enhanced requirement, we filter out those records whose delivery type is ‘PRIME’.

  • For filtering, we pass lambda-expression of type Predicate<K,V> as an argument to KStream’s method filter. Basically, here we are supplying the definition of the logic to filter-out unwanted records.
KStream<String, PosInvoice> KS1 = KS0.filter((key, value) ->
value.getCustomerType().equalsIgnoreCase(AppConfigs.CUSTOMER_TYPE_PRIME));

iv.) Next step is to create StoreBuilder. Instead of creating a state-store directly, we create the StoreBuilder here. Using this StoreBuilder, the stream-tasks can create their own copy of State-stores.If we create the stores & add it to topology, then it might cause problems because the StateStore needs to be shared & kafka streams are not designed that way. Its designed to execute multiple tasks independently in order to allow us to scale the application vertically & horizontally. The store name is a must for any state-store. At-last, Store-Builder needs to be added to the State-Builder, so that out streams-builder knows about the state-builder. Below snippet demonstrates the In-memory key value store creation.

StoreBuilder kvStoreBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(AppConfigs.REWARDS_STORE_NAME),
AppSerdes.String(),
AppSerdes.Double()
);
builder.addStateStore(kvStoreBuilder);

We can also create a Persistent Key-value store like this snippet below. It shall create a embedded ROCKS database.

StoreBuilder kvStoreBuilder1 = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(AppConfigs.REWARDS_STORE_NAME),
AppSerdes.String(), AppSerdes.Double()
);
builder.addStateStore(kvStoreBuilder1);

v.) Next step is to transform the value from ‘POSInvoice’ type to ‘Notification’ type of MessageRecord. Here is Approach no.1 looks like :-

Java 7 style would look something like :-

KS1.transformValues(new ValueTransformerSupplier<PosInvoice, Notification>() {
@Override
public ValueTransformer<PosInvoice, Notification> get() {
// Business Logic to transform
}
}

Java8 Lambda way of doing this shall be as shown below. We have a ‘RewardsTransformer’ class to transform the given object to another.

KS1.transformValues(() -> new RewardsTransformer(), AppConfigs.REWARDS_STORE_NAME)
.to(AppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));

Important Points :-

  • We need to pass-in the ‘ValueTransformedSupplier’ to the transformValues() method.
  • The ‘ValueTransformer’ object is provided by the Kafka processor APIs. Streams API offers us a single method class to supply our business logic i.e. ‘transformValues()’ method and it combines the kafka stream DSL APIs & kafka processor stream APIs.
  • The init() method of ‘ValueTransformer’ is called only once. We use ProcessorContext to get the StateStore. Here is snippet for it :-
private KeyValueStore<String, Double> stateStore;

@Override
public void init(ProcessorContext processorContext) {
this.stateStore = (KeyValueStore<String, Double>) processorContext.getStateStore(AppConfigs.REWARDS_STORE_NAME);
}
  • The transform() method is called once for every message in the stream. Here is snippet looks like :-
@Override
public Notification transform(PosInvoice posInvoice) {
Notification notification = new Notification()
.withInvoiceNumber(posInvoice.getInvoiceNumber())
.withCustomerCardNo(posInvoice.getCustomerCardNo())
.withTotalAmount(posInvoice.getTotalAmount())
.withEarnedLoyaltyPoints(posInvoice.getTotalAmount() * AppConfigs.LOYALTY_FACTOR)
.withTotalLoyaltyPoints(0.0);
Double accumulatedRewards = stateStore.get(notification.getCustomerCardNo());
Double totalRewards;
if (accumulatedRewards != null)
totalRewards = accumulatedRewards + notification.getEarnedLoyaltyPoints();
else
totalRewards = notification.getEarnedLoyaltyPoints();
stateStore.put(notification.getCustomerCardNo(), totalRewards);
notification.setTotalLoyaltyPoints(totalRewards);

return notification;
}

Some important pointers and issues regarding aforesaid topology :-

  • While at producer side, we used the StoreId to send the Invoices to the Kafka-topic. Also, we have used the Default Partitioner. This default partitioner would first calculate the hash-value of the message-key to route the messages to the partition. Therefore, all the invoices belonging to the same key would land into the same partition.
  • Now, below is how structure of our application looks like. We have 3 stores sending the Invoices to the Topic ‘POS’. This topic haas 3 partitions and there are 3 stream-processing-tasks working in parallel to do the load-sharing.

The problem here is, say a particular customer Alex visits to Store S1 today and purchases something and then visits Store S2 tomorrow and again purchases something. In this scenario, his invoices from S1 may end up landing to P1 and invoices from S2 may end up landing at Partition P2. Here, each stream-processing-task is maintaining its own separate state-store. So, it might be possible that Task1 in its own local-state-store have x1 as value of reward-points for customer C1 and Task2 in its own local-state-store have x2 as value of reward-points for customer C1. This is an inconsistent state, as every stream-processing-task is maintaining its local-state-stores. Lets see, how can we solve this problem :-

Solution #1:- We change the Key itself i.e. all the invoices belonging to the particular customer should land at the same partition, no matter from which store, this transaction is coming from. So, earlier we were using the ‘StoreId’ as the messageKey, going forward we shall be using the ‘CustomerId’ as the messageKey.

Since the Default-Partitioner uses the hash-value of the message key, therefore all the messages belonging to same customer, would land up at the same partition. Now, since our original topic accepts the storeId as the key, another way to solve this is to use the temporary topic, but this again comes with it own complexities of reading and writing from/to the new topic.

Solution #2:- We repartition the Invoices-data and start sending our invoices using the Custom-Partitioner. This can be achieved using the ‘through’ method provided by stream-processor APIs. Please note here that, repartitioning of the data can be time consuming, might impact the performance of streaming-applications but sometimes might be an inevitable activity.

KS0
.through(AppConfigs.REWARDS_TEMP_TOPIC,Produced.with(AppSerdes.String(), AppSerdes.PosInvoice(), new RewardsPartitioner()))
.transformValues(() -> new RewardsTransformer(), AppConfigs.REWARDS_STORE_NAME)
.to(AppConfigs.notificationTopic,Produced.with(AppSerdes.String(), AppSerdes.Notification()));

Again there are inherent problem of ‘Relocating state-store from one machine to another’ with aforesaid both of the approaches. Lets take scenarios :-

  • Scenario-#1:- Say we had 3 tasks initially and each task was working on each of 1 partition and all of sudden, stream-task-3 crashes. Now, with this task-3 going gaga, the state-store(which was local to this task-3) also stands to be gone.
  • Scenario-#2:- Say we had 1 task initially and this task was working on all of 3 partitions and now we realised the need of scaling our application and thought of adding the 2 new streaming-tasks. What shall happen here is, our new task would need the access to the state-store for the previously processed messages. Say earlier customer C1 data was landing at partition-2 and now P2 got assigned to Task-2. So, Task-2 would need the state-store information of the C2 first.

Lets now, see how Kafka-Streams provides the local-state-store relocation in order to achieve Fault-tolerance and Scalability. The State-Store can be backed to the another kafka topic in order to save the store. This topic is generally called as State-Change-Log and by default it is always enabled. We may not want to disable it.

Conclusions regarding Streams:-

  • KafkaStreams provides fault-tolerant mechanism to handle the real-time stream of data processing.
  • We create a KStream by opening a Kafka Stream to the particular topic.
  • Working with KafkaStreams is like working with individual messages one at a time.
  • If we need to remember past information OR we want to work with group of messages, we shall need Local-State-Store. Every state-store ultimately looks like a Table.

Introducing KTable / Change-log-Stream :-

  • Kafka Stream-APIs allows us to create Tables, using an abstraction called KTables.
  • KTable can also be called as local-key-value-state-store, where we can store the messageKey and the corresponding value.
  • We can create a KTable by opening a KTable to the particular Kafka Topic. Once we open a KTable for any topic, all the messages landing to this Topic shall come and be sitted in this table.
  • KTable can be visualised as a table with a primary-key. There can only be 1 value corresponding to any key in the KTable, we can’t have duplicates. Each data-record in an KTable is an upserts (i.e. Insert → Update). If we put a record whose value is NULL, that record shall be deleted from the KTable.
  • One important difference between the KStream with KTable is that, in a KStream, all records shall flow in a sequence, there is no relation of current record with any previous record, whereas in a KTable a new record might update an older record, if it already exists in the state-store/KTable.

Lets now see a real-time example of processing the stream of messages into an KTable. Here is the requirement :-

Lets start opening the KTable to this topic :-

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder streamsBuilder = new StreamsBuilder();
KTable<String, String> KT0 = streamsBuilder.table(AppConfigs.topicName);

Lets now see, what messages we are getting into this KTable. Please note here that, KTable doesn’t provides us any print or peek or foreach methods. So, we necessarily have to convert our KTable to KStream in order to print it. Below is how, we can achieve the same :-

KT0.toStream().print(Printed.<String, String>toSysOut().withLabel("KT0"));

Now, Lets do some filtering of messageRecords arriving in this topic. KTable also supports the ‘filter’ method and result of table filer would be an another table.

KTable<String, String> KT1 = KT0
.filter((k, v) -> k.matches(AppConfigs.regExSymbol) && !v.isEmpty(), Materialized.as(AppConfigs.stateStoreName));
KT1.toStream().print(Printed.<String, String>toSysOut().withLabel("KT1"));

Please note here that, KTable that we created above, internally do creates the state-stores and if we want to work with those state-stores ourselves, we can specify that as well i.e. we can have a ‘Materialise’ any particular KTable as well.

  • Example no 1 :- Here, KTable is being first created from source topic and then materialised as internal state store with name as stateStoreName1.
KTable<String, String> KT0 = streamsBuilder.table(AppConfigs.topicName, Materialized.as(AppConfigs.stateStoreName1)));
  • Example no. 2 :- Here, KTable is being first filtered from another topic and then materialised as internal state store with name as stateStoreName.
KTable<String, String> KT1 = KT0.filter((k, v) -> k.matches(AppConfigs.regExSymbol) && !v.isEmpty(), Materialized.as(AppConfigs.stateStoreName));

Caching of the data in KTable :-

The data in KTable might reach little late as compared to the speed with which data is being visible in KStreams. Lets see reason for this :-

  • KTable is responsible for forwarding the records to the downstream-processors as well as to the Internal state-store(i.e. underlying ROCKS database), however the records doesn’t reaches to the downstream processors immediately because the KTable will internally caches the records in memory and wait for a while for more records to arrive, as this strategy is an optimisation technique to reduce the no of operations.
  • This “wait-period” is being used by the KTable to update the records in-memory and forwards only the latest record to the downstream processors. Say the wait-period has been configured as 100 ms, then during this 100ms, say there arrived 7 messageRecords for a particular key in this KTable, then with this approach, the no. of records being forwarded to the downstream processors is reduced to only 1, as all the 7 times the forwarding-operation is not being done.
  • Thus, only the most latest record shall be sent to the downstream processors. The net result anyhow remains same, because anyways the new record (with Key as K1 and value as V2) shall be overriding the older record (with key as K1 and value as V1).
  • We do have control in our hands to disable this Caching as well. Please note that, Caching has no impact on the correctness of the data, but it’s just an optimisation technique.
  • In real-life-examples, setting this period to around 200/300 ms would greatly reduce the count of records being forwarded to the downstream processors, but it would impact the performance as well.
  • Important attribute is: ‘commit.interval.ms’, the maximum time for the processor to wait before emitting the records to the downstream processors or to the state-store.
  • Another Important attribute is: ‘cache.max.bytes.buffering’, the no of bytes being allocated for caching. This is the maximum buffer for all the threads running in application-instance. e.g. If we have 10 threads running on a single machine and have specified this value as 10 MB, then each thread would get 1 MB of buffer memory.

Thus, the data is automatically flushed to the internal state store and the downstream processors, once earliest of above 2 properties reaches / hits. We can disable these settings as well by setting their value to ZERO explicitly.

These values can be set different for different processors as well using the below approach :-

Lets take an working example for KTables :- Say, at t=0, record <K1,V1> arrives into the topic T1 and it reaches to the KTable & Internal-State-Store. Say, at t=1, another record <K1,V2> arrives into the topic T1 and it reaches to the KTable & Internal-State-Store. Then, our KTable & Internal-State-Store would have only 1 entry i.e. <K1,V2>. Thus, important pointers regarding to KTable are :-

  • KTable are an update-stream backed by the local-data-store.
  • Records are always upserted into the KTable i.e. New records are inserted and existing records are updated.
  • Any record with an existing key and null value is considered as a delete operation for the KTable.
  • We can read messages from kafka topic into a KTable, apply transformations on it and again convert it back to KStreams, dependning upon the use-case.
  • Each Streaming-task shall be working on a particular partition of the Topic and shall be having their own local copy of the KTable and hence the local copy of the Internal State Store(implemented internally using the ROCKS db), where the KTable data is being persisted.

Now, In some scenarios, we may have a requirement to have a global state store, to which all three tasks can access. In those cases, we can use something called as Global-KTable. Lets take an example to understand this concept :- Say we have a topic with 5 partitions and we want to read this topic into a Kafka Table. We already know that, Partitions are the main idea behind the parallel processing in Kafka and hence we can run maximum of 5 instances of our application on this topic to achieve maximum degree of parallelism. Say, each of the 5 instances got each of the 5 partitions to work upon. In this case, each of the streaming-instance would maintain their local KTable. It may look something like this :-

If we read the data from this topic to a Global-KTable, each instance of the application shall be assigned all 5 partitions and Global KTable at each instance level would read the data from all partitions and hence all of them would possess all the data. Graph would look something like this :-

Please note that, this scenario is problematic as it can cause duplicate processing of the same record, however for having look-up tables or to have broadcasting store, Global-KTable makes the perfect sense. The Global-KTable requires local storage at each instance(i.e. computer/machine/server) level and they also increase the network traffic and broker workload, because all of the instances now reads the entire data. Its fine to use Global-KTable for the scenarios which involves less amount of data required by all instances to refer.

Adding Request / Response capability to the Streaming application :-

Next, we can also setup the Streams-State-Listener. This is equivalent to setting up RESTful APIs to query our Internal-state-store. It can be achieved by using QueryServer, provided as part of streams API. Using QueryServer, at any point-in-time we can know what all entries <K,V> are being present into the Internal-State-Store. Using the Restful APIs, any client can ask very specific questions, which can be answered by Kafka-Streams-Task by referring to the Internal-State-Store. Bottom-line is, we are adding the Request/Response capability to the Kafka-Streams-application. Kafka-streams-application offers a REST interface for other applications to query anything from the state-store. Even the dash-boarding applications can also query directly to this state-store. Please note that, Streaming-application is well-positioned to support this kind of querying mechanism from the Application-State-store. This feature is also called as “Interactive Query” feature of Kafka-Streams.

Some crucial points regarding State-Stores :-

  • Kafka-streaming-state-stores are local in nature. In other words, the full state of the application is being across the multiple instances of the application.
  • Say, we are running 3 instances of our Streaming-application, in that case all 3 instances would be maintaining their own local state stores i.e. the state-store is being managed locally by those streaming-application-instances on their respective machines. Below picture demonstrates this notion.
  • Please note here that, for machine-1, the state-store at its own machine(i.e. at machine-1) is known as Local-State-Store whereas for machine-1, the state-stores lying at other instances(i.e. at machine-2 & 3) are termed as Remote-State-Store.

Querying to the State-Stores:-

The Streaming-application-instance can query to the locally managed portion of the state-store, however querying to the state-store portion being managed at other machines can be achieved using streaming-applications with little more mechanics.

  • As on date of writing this blog, Kafka-streams applications don’t provide any APIs to query to remote-state-store, however it does provide the APIs to find the list of all other active instances & their hostNames. Machine-1 powering to streams-application-1 can even come to know, whether other streaming-applications(being running at other machines) do have the data that app-1 is querying for ?
  • Thus, Streaming-application-1 can forward the request to the other streaming-application instances and collects the response from the remote-state-stores.
  • The application-state can be queried over the RPC layer, for e.g. RESTful APIs, GraphQL, etc. The requestor should be able to reach-out to any-one instance of the streaming-application and request for the desired data.
  • Say, the requestor queries to the instance-1, then instance-1 would internally use the Kafka-streaming-APIs to determine the requested data. If the requested-data is available in the local state-store, it shall return the response back to the requestor from its local-state-store. In case the requested data is present at some remote-state-store(say take e.g. at instance-3’s state-store), then instance-1 shall be querying to the remote-state-store using the same RPC layer. Its just like forwarding the request internally. After receiving the data back from the remote-state-store, data shall be sent back to the requestor.

Aforesaid architecture abstracts the fact that, whether the data is coming from local-state-store OR remote-state-store. Requestor doesn’t even knows in which state-store, the actual data is lying and It is absolutely free to query to any of the available instance. Lets now see the low-level details required to get this working :-

  • If there are 3 parallel streaming-application-instances, then all 3 instances should have unique value for : ‘APPLICATION_SERVER_CONFIG’. One common way of achieving this is with the use of combination of IP+Port.
  • We also need to create a Service to serve the RPC calls.