System Design with Kafka Streams

props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
  • We define the computational logic(in Kafka streaming application) using the processor APIs which gets bundled into a Topology. Topology is a unit of execution.
  • At runtime, the kafka framework shall create the logical streaming tasks and each of these streams-tasks shall copy the inherent topology object and execute it independently in parallel.
  • Kafka framework shall create fixed no. of logical tasks on this single machine.
  • The no. of tasks is equal to the no. of partitions in the input topic. In case, there are more than 2 input topics, then no of tasks is equal to the largest no of partitions amongst the input topic. Thus, in our scenario, 3 tasks shall be created.
  • Next Kafka framework assigns the partitions to these tasks evenly i.e. 1 partition from each topic to each task.
  • Now, the initial assignment of the partitions to the tasks never changes and hence, no of tasks are fixed and therefore its the maximum degree of parallelism for the application. This are all Logical tasks.
  • At this stage, these 3 tasks are ready to be assigned to the application-threads OR Instances. Lets further assume that, we started our application on 2 machines, each having a single thread. Then, framework shall assign 1 task to each instance and the remaining 3rd task shall be assigned randomly to any one of the instance.
  • Say we now want to scale up this application with another instance i.e. a altogether new machine, then kafka framework shall automatically migrate to the new instance under the hoods, without user / developer being even aware about this stuff.
  • Say we further want to add few more instances, then those new instances shall remain idle because degree of parallelism is directly dependent upon the no of partitions and hence we don’t have any new task to assign to them.
  • Fault-tolerance:- Say if an active instance-3 (on which kafka-streaming-task-3 is runninng) goes down :-
// CREATE CONFIGURATIONSProperties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// CREATE STREAMS TOPOLOGY
StreamsBuilder streamsBuilder = new StreamsBuilder();KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);kStream.foreach((k, v) -> System.out.println(“Key= “ + k + “ Value= “ + v));//kStream.peek((k,v)-> System.out.println(“Key= “ + k + “ Value= “ + v));Topology topology = streamsBuilder.build();
// START THE STREAMS
KafkaStreams streams = new KafkaStreams(topology, props);
logger.info(“Starting stream.”);
streams.start();

// SHUTDOWN HOOK
Runtime.getRuntime().addShutdownHook(new Thread(() -> {logger.info(“Shutting down stream”);
streams.close();
}));
KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);
kStream.foreach((k, v) -> System.out.println(“Key= “ + k + “ Value= “ + v));
  • filter() — Returns KStreams
  • map() — Returns KStreams
  • flatmap() — Returns KStreams
  • to() — Returns Void — Terminating / Sink Processor.
  • foreach() — Returns Void — Terminating / Sink Processor.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, PosInvoice> KS0 = builder.stream(AppConfigs.posTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));
  • For filtering, we pass lambda-expression of type Predicate<K,V> as an argument to KStream’s method filter. Basically, here we are supplying the definition of the logic to filter-out unwanted records.
  • Post filtering, we shall be using the ‘to’ method which internally creates an producer and push the record to the desired topic. The ‘to’ method is actually a SINK processor where-in, we pass-in 2 arguments i.e. first as desired topicName(where the messageRecord has to be pushed) and second the SERDES class with which we want to serialize the object.
KS0
.filter((k, v) -> v.getDeliveryType().equalsIgnoreCase(
AppConfigs.DELIVERY_TYPE_HOME_DELIVERY))
.to(AppConfigs.shipmentTopicName, Produced.with(AppSerdes.String(), AppSerdes.PosInvoice()));
  • For filtering, we pass lambda-expression of type Predicate<K,V> as an argument to KStream’s method filter. Basically, here we are supplying the definition of the logic to filter-out unwanted records.
  • Post filtering, we need to convert these ‘POS’ type of MessageRecords to the ‘Notification’ type of MessageRecords.
  • Next, we shall be using the ‘to’ method which internally creates an producer and push the record to the desired topic. The ‘to’ method is actually a SINK processor where-in, we pass-in 2 arguments i.e. first as desired topicName(where the messageRecord has to be pushed) and second the SERDES class with which we want to serialize the object.
KS0
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(
AppConfigs.CUSTOMER_TYPE_PRIME))
.mapValues(invoice -> RecordMaker.getNotificationTypeRecordFromInvoiceTypeRecord(invoice))
.to(AppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));
KS0
.mapValues(invoice -> RecordBuilder.getMaskedInvoice(invoice))
.flatMapValues(invoice -> RecordBuilder.getHadoopRecords(invoice))
.to(AppConfigs.hadoopTopic, Produced.with(AppSerdes.String(), AppSerdes.HadoopRecord()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
  • First we create a source processor to read the stream of invoices i.e. KS0.
  • Then, from KS0 stream, we filter out only PRIME customer type of Invoices. This new stream is called as KS3. At this stage, the processor involved is a Stateless processor.
  • At this step, we compute the reward-points gained by customer for this Invoice and include it in ‘Notification’ MessageRecord. Then we apply mapValues() processor to map the Invoice type of MessageRecords to the Notification type of MessageRecords. Lets call this new stream of ‘Notification’ type of MessageRecords as KS4.
  • Now, we want to add the previously earned points to the points earned through this Invoice. We can achieve this by having a new processor which shall query to some lookup-table (for knowing the net total no. of points earned by this customer so far) and adding it to the current reward points and then updating the ‘Notification’ MessageRecord object and then writing back the updated value of reward points back to the same lookup-table for next query. Please note here that, at this stage, the processor involved is a Stateful processor, because the processor of KS4 stream needs to know even about the previous information as well.
  • Finally, we can send the updated ‘Notification’ MessageRecord object to the another topic for getting picked by SMS Service.
  • Non-Persistent / In-memory store → Presents risk of being washed away in case of crash.
  • Persistent / Remote-databases(like PostGres, Cassandra, Aerospike, etc.) → Presents performance bottleneck due to network calls involved.
  • Fault-tolerant In-memory state stores.
  • Recoverable persistent state stores.
  • Stateless processors → Stateless transformations doesn’t require the states and hence doesn’t even needs the lookup-table. E.g. Processors such as mapValues(), filter, flatMapValues(), etc.
  • Stateful processors → Stateful transformations do require the states and even needs the lookup-table(i.e. State store) for computing the output as well. E.g. Processors such as Aggregators, Joining, Windowing, etc.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, PosInvoice> KS0 = builder.stream(AppConfigs.posTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));
  • For filtering, we pass lambda-expression of type Predicate<K,V> as an argument to KStream’s method filter. Basically, here we are supplying the definition of the logic to filter-out unwanted records.
KStream<String, PosInvoice> KS1 = KS0.filter((key, value) ->
value.getCustomerType().equalsIgnoreCase(AppConfigs.CUSTOMER_TYPE_PRIME));
StoreBuilder kvStoreBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(AppConfigs.REWARDS_STORE_NAME),
AppSerdes.String(),
AppSerdes.Double()
);
builder.addStateStore(kvStoreBuilder);
StoreBuilder kvStoreBuilder1 = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(AppConfigs.REWARDS_STORE_NAME),
AppSerdes.String(), AppSerdes.Double()
);
builder.addStateStore(kvStoreBuilder1);
KS1.transformValues(new ValueTransformerSupplier<PosInvoice, Notification>() {
@Override
public ValueTransformer<PosInvoice, Notification> get() {
// Business Logic to transform
}
}
KS1.transformValues(() -> new RewardsTransformer(), AppConfigs.REWARDS_STORE_NAME)
.to(AppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));
  • We need to pass-in the ‘ValueTransformedSupplier’ to the transformValues() method.
  • The ‘ValueTransformer’ object is provided by the Kafka processor APIs. Streams API offers us a single method class to supply our business logic i.e. ‘transformValues()’ method and it combines the kafka stream DSL APIs & kafka processor stream APIs.
  • The init() method of ‘ValueTransformer’ is called only once. We use ProcessorContext to get the StateStore. Here is snippet for it :-
private KeyValueStore<String, Double> stateStore;

@Override
public void init(ProcessorContext processorContext) {
this.stateStore = (KeyValueStore<String, Double>) processorContext.getStateStore(AppConfigs.REWARDS_STORE_NAME);
}
  • The transform() method is called once for every message in the stream. Here is snippet looks like :-
@Override
public Notification transform(PosInvoice posInvoice) {
Notification notification = new Notification()
.withInvoiceNumber(posInvoice.getInvoiceNumber())
.withCustomerCardNo(posInvoice.getCustomerCardNo())
.withTotalAmount(posInvoice.getTotalAmount())
.withEarnedLoyaltyPoints(posInvoice.getTotalAmount() * AppConfigs.LOYALTY_FACTOR)
.withTotalLoyaltyPoints(0.0);
Double accumulatedRewards = stateStore.get(notification.getCustomerCardNo());
Double totalRewards;
if (accumulatedRewards != null)
totalRewards = accumulatedRewards + notification.getEarnedLoyaltyPoints();
else
totalRewards = notification.getEarnedLoyaltyPoints();
stateStore.put(notification.getCustomerCardNo(), totalRewards);
notification.setTotalLoyaltyPoints(totalRewards);

return notification;
}
  • While at producer side, we used the StoreId to send the Invoices to the Kafka-topic. Also, we have used the Default Partitioner. This default partitioner would first calculate the hash-value of the message-key to route the messages to the partition. Therefore, all the invoices belonging to the same key would land into the same partition.
  • Now, below is how structure of our application looks like. We have 3 stores sending the Invoices to the Topic ‘POS’. This topic haas 3 partitions and there are 3 stream-processing-tasks working in parallel to do the load-sharing.
KS0
.through(AppConfigs.REWARDS_TEMP_TOPIC,Produced.with(AppSerdes.String(), AppSerdes.PosInvoice(), new RewardsPartitioner()))
.transformValues(() -> new RewardsTransformer(), AppConfigs.REWARDS_STORE_NAME)
.to(AppConfigs.notificationTopic,Produced.with(AppSerdes.String(), AppSerdes.Notification()));
  • Scenario-#1:- Say we had 3 tasks initially and each task was working on each of 1 partition and all of sudden, stream-task-3 crashes. Now, with this task-3 going gaga, the state-store(which was local to this task-3) also stands to be gone.
  • Scenario-#2:- Say we had 1 task initially and this task was working on all of 3 partitions and now we realised the need of scaling our application and thought of adding the 2 new streaming-tasks. What shall happen here is, our new task would need the access to the state-store for the previously processed messages. Say earlier customer C1 data was landing at partition-2 and now P2 got assigned to Task-2. So, Task-2 would need the state-store information of the C2 first.
  • KafkaStreams provides fault-tolerant mechanism to handle the real-time stream of data processing.
  • We create a KStream by opening a Kafka Stream to the particular topic.
  • Working with KafkaStreams is like working with individual messages one at a time.
  • If we need to remember past information OR we want to work with group of messages, we shall need Local-State-Store. Every state-store ultimately looks like a Table.
  • Kafka Stream-APIs allows us to create Tables, using an abstraction called KTables.
  • KTable can also be called as local-key-value-state-store, where we can store the messageKey and the corresponding value.
  • We can create a KTable by opening a KTable to the particular Kafka Topic. Once we open a KTable for any topic, all the messages landing to this Topic shall come and be sitted in this table.
  • KTable can be visualised as a table with a primary-key. There can only be 1 value corresponding to any key in the KTable, we can’t have duplicates. Each data-record in an KTable is an upserts (i.e. Insert → Update). If we put a record whose value is NULL, that record shall be deleted from the KTable.
  • One important difference between the KStream with KTable is that, in a KStream, all records shall flow in a sequence, there is no relation of current record with any previous record, whereas in a KTable a new record might update an older record, if it already exists in the state-store/KTable.
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder streamsBuilder = new StreamsBuilder();
KTable<String, String> KT0 = streamsBuilder.table(AppConfigs.topicName);
KT0.toStream().print(Printed.<String, String>toSysOut().withLabel("KT0"));
KTable<String, String> KT1 = KT0
.filter((k, v) -> k.matches(AppConfigs.regExSymbol) && !v.isEmpty(), Materialized.as(AppConfigs.stateStoreName));
KT1.toStream().print(Printed.<String, String>toSysOut().withLabel("KT1"));
  • Example no 1 :- Here, KTable is being first created from source topic and then materialised as internal state store with name as stateStoreName1.
KTable<String, String> KT0 = streamsBuilder.table(AppConfigs.topicName, Materialized.as(AppConfigs.stateStoreName1)));
  • Example no. 2 :- Here, KTable is being first filtered from another topic and then materialised as internal state store with name as stateStoreName.
KTable<String, String> KT1 = KT0.filter((k, v) -> k.matches(AppConfigs.regExSymbol) && !v.isEmpty(), Materialized.as(AppConfigs.stateStoreName));
  • KTable is responsible for forwarding the records to the downstream-processors as well as to the Internal state-store(i.e. underlying ROCKS database), however the records doesn’t reaches to the downstream processors immediately because the KTable will internally caches the records in memory and wait for a while for more records to arrive, as this strategy is an optimisation technique to reduce the no of operations.
  • This “wait-period” is being used by the KTable to update the records in-memory and forwards only the latest record to the downstream processors. Say the wait-period has been configured as 100 ms, then during this 100ms, say there arrived 7 messageRecords for a particular key in this KTable, then with this approach, the no. of records being forwarded to the downstream processors is reduced to only 1, as all the 7 times the forwarding-operation is not being done.
  • Thus, only the most latest record shall be sent to the downstream processors. The net result anyhow remains same, because anyways the new record (with Key as K1 and value as V2) shall be overriding the older record (with key as K1 and value as V1).
  • We do have control in our hands to disable this Caching as well. Please note that, Caching has no impact on the correctness of the data, but it’s just an optimisation technique.
  • In real-life-examples, setting this period to around 200/300 ms would greatly reduce the count of records being forwarded to the downstream processors, but it would impact the performance as well.
  • Important attribute is: ‘commit.interval.ms’, the maximum time for the processor to wait before emitting the records to the downstream processors or to the state-store.
  • Another Important attribute is: ‘cache.max.bytes.buffering’, the no of bytes being allocated for caching. This is the maximum buffer for all the threads running in application-instance. e.g. If we have 10 threads running on a single machine and have specified this value as 10 MB, then each thread would get 1 MB of buffer memory.
  • KTable are an update-stream backed by the local-data-store.
  • Records are always upserted into the KTable i.e. New records are inserted and existing records are updated.
  • Any record with an existing key and null value is considered as a delete operation for the KTable.
  • We can read messages from kafka topic into a KTable, apply transformations on it and again convert it back to KStreams, dependning upon the use-case.
  • Each Streaming-task shall be working on a particular partition of the Topic and shall be having their own local copy of the KTable and hence the local copy of the Internal State Store(implemented internally using the ROCKS db), where the KTable data is being persisted.
  • Kafka-streaming-state-stores are local in nature. In other words, the full state of the application is being across the multiple instances of the application.
  • Say, we are running 3 instances of our Streaming-application, in that case all 3 instances would be maintaining their own local state stores i.e. the state-store is being managed locally by those streaming-application-instances on their respective machines. Below picture demonstrates this notion.
  • Please note here that, for machine-1, the state-store at its own machine(i.e. at machine-1) is known as Local-State-Store whereas for machine-1, the state-stores lying at other instances(i.e. at machine-2 & 3) are termed as Remote-State-Store.
  • As on date of writing this blog, Kafka-streams applications don’t provide any APIs to query to remote-state-store, however it does provide the APIs to find the list of all other active instances & their hostNames. Machine-1 powering to streams-application-1 can even come to know, whether other streaming-applications(being running at other machines) do have the data that app-1 is querying for ?
  • Thus, Streaming-application-1 can forward the request to the other streaming-application instances and collects the response from the remote-state-stores.
  • The application-state can be queried over the RPC layer, for e.g. RESTful APIs, GraphQL, etc. The requestor should be able to reach-out to any-one instance of the streaming-application and request for the desired data.
  • Say, the requestor queries to the instance-1, then instance-1 would internally use the Kafka-streaming-APIs to determine the requested data. If the requested-data is available in the local state-store, it shall return the response back to the requestor from its local-state-store. In case the requested data is present at some remote-state-store(say take e.g. at instance-3’s state-store), then instance-1 shall be querying to the remote-state-store using the same RPC layer. Its just like forwarding the request internally. After receiving the data back from the remote-state-store, data shall be sent back to the requestor.
  • If there are 3 parallel streaming-application-instances, then all 3 instances should have unique value for : ‘APPLICATION_SERVER_CONFIG’. One common way of achieving this is with the use of combination of IP+Port.
  • We also need to create a Service to serve the RPC calls.

--

--

Software Engineer for Big Data distributed systems

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store