Confluent Kafka First hand experience with Transactions

aditya goel
10 min readNov 26, 2022

--

Step 1st.) Start the zookeeper in one terminal :-

/Users/aditya/Documents/INSTALLS/confluent-5.5.1

aditya-MAC:confluent-5.5.1 aditya$ cd bin/

aditya-MAC:bin aditya$ ./zookeeper-server-start ../etc/kafka/zookeeper.properties

Step 2nd.) Create 3 separate copies of the server.properties, since we are going to setup the 3 node kafka cluster at our machine :-

Below can be properties for 1st Broker :-

broker.id=0

listeners=PLAINTEXT://:9092

log.dirs=../tmp/kafka-logs-0

confluent.metadata.server.listeners=http://0.0.0.0:8090

confluent.metadata.server.advertised.listeners=http://127.0.0.1:8090

Below can be properties for 2nd Broker :-

broker.id=1

listeners=PLAINTEXT://:9093

log.dirs=../tmp/kafka-logs-1

confluent.metadata.server.listeners=http://0.0.0.0:8091

confluent.metadata.server.advertised.listeners=http://127.0.0.1:8091

Below can be properties for 3rd Broker :-

broker.id=2

listeners=PLAINTEXT://:9094

log.dirs=../tmp/kafka-logs-2

confluent.metadata.server.listeners=http://0.0.0.0:8092

confluent.metadata.server.advertised.listeners=http://127.0.0.1:8092

Step 3rd.) Start three Kafka Broker nodes(Three node cluster) in separate terminals :-

(This shall start up the kafka brokers at following port nos:- 9092, 9093 & 9094).

In another terminal :-

aditya-MAC:bin aditya$ ./kafka-server-start ../etc/kafka/server0.properties

In another terminal :-

aditya-MAC:bin aditya$ ./kafka-server-start ../etc/kafka/server1.properties

In another terminal :-

aditya-MAC:bin aditya$ ./kafka-server-start ../etc/kafka/server2.properties

Step 4th.) Create a Topic with name as ‘stock-picks’.

aditya-MAC:bin aditya$ ./kafka-topics — create — topic stock-picks — partitions 3 — replication-factor 3 — bootstrap-server localhost:9092

Created topic stock-picks.

aditya-MAC:bin aditya$

Step 5th.) Start two separate Kafka console consumers in separate terminals :-

Note: They should belong to same group and they should be reading to the same cluster of brokers.

In another terminal, run first instance of consumer:-

aditya-MAC:bin aditya$ ./kafka-console-consumer — topic stock-picks — bootstrap-server localhost:9092 — from-beginning — group group1

In another terminal, run second instance of consumer:-

aditya-MAC:bin aditya$ ./kafka-console-consumer — topic stock-picks — bootstrap-server localhost:9092 — from-beginning — group group1

Step 6th.) Send the Excel File to afore-created topic(i.e. ‘stock-picks’) using kafka console producer :-

aditya-MAC:bin aditya$ pwd

/Users/aditya/Documents/INSTALLS/confluent-5.5.1/bin

aditya-MAC:bin aditya$ ./kafka-console-producer — topic stock-picks — broker-list localhost:9092 < ../../DATASET/data/sample1.csv

>>aditya-MAC:bin aditya$

Now, as soon as the aforesaid file is being produced to the kafka, it shall be consumed by the 2 set of consumers :- (Below output shows nothing, but the data consumed by Kafka consumers).

Output of terminal where 1st consumer is running :-

ZYLOG,BE,0.8,0.9,0.8,0.85,0.85,0.85,2316,1993.8,05-NOV-2018,11,INE225I01026,

^CProcessed a total of 1153 messages

Output of terminal where 2nd consumer is running :-

UTISENSETF,EQ,364.74,365,360.05,364.98,364.98,364.75,38,13794.51,05-NOV-2018,13,INF789FB1X58,

^CProcessed a total of 754 messages

Knowledge Bits :-

The Zookeeper maintains the list of active brokers on ephemeral port numbers. Thats how, it keeps a check on situation, where one particular broker goes out of the cluster. Lets check out our use-case by checking zookeeper shell :-

aditya-MAC:bin aditya$ pwd

/Users/aditya/Documents/INSTALLS/confluent-5.5.1/bin

aditya-MAC:bin aditya$ ./zookeeper-shell localhost:2181

Connecting to localhost:2181

Welcome to ZooKeeper!

JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

Lets list out the zookeeper directory :-

ls /

[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

Lets list out the zookeeper directory for /brokers:-

ls /brokers

[ids, seqid, topics]

Lets list out the zookeeper directory for /brokers/ids :-

ls /brokers/ids

[0, 1, 2]

Sending Data via JAVA APIs :-

for (int i = 1; i <= AppConfigs.numEvents; i++) {
producer.send(new ProducerRecord<>(AppConfigs.topicName1, i, "Test-message-" + i));
producer.send(new ProducerRecord<>(AppConfigs.topicName2, i, "Test-message-" + i));
//Thread.sleep(100);
}
logger.info("Aborting Second Transaction and going to sleep :-");

The producer.send method to handover the producer record to the kafka producer object. The send method returns immediately without waiting for the response.

The kafka producer object would internally serialize the message key and message value based upon the serializers provided as the properties.

Then producer shall determine the target partition number for the message to be delivered. We can also provide the custom partioner class using the properties object or let the producer use the default partitioner.

The serialized message goes and then sits into the buffer based upon the destination address. For each destination (i.e. partition), we have separate buffer.

Finally, an IO thread which runs in background, will pickup some messages from the buffer, combine them to make a single data packet and send it to the Kafka cluster of brokers. Broker shall save this data to log-file and send back an acknowledgement to the I/O thread.

If the I/O thread doesn’t receives the acknowledgement, it tries resending the message and again wait for the acknowledgement. If it still doesn’t receives the acknowledgement at all even after some retries, the I/O thread shall give back the error back to the producer.send method.

Kafka-producer At-Least once semantics :-

Apache Kafka provides message durability guarantee by committing the message at Kafka Partition. It means that, once the message is persisted into the leader partition, we cant loose that message until the leader is alive. However, if leader partition goes down, we may loose the data. To protect the data from loss of failure, we have replication. Kafka implements replication using followers. Follower nodes copies the data from partition-master node and provides fault-tolerance. In other words, once the message gets persisted in all the nodes including all the nodes in ISR list as well, the message is considered to be fully persisted. So, we are in position to have the message/record until all the nodes in the cluster are not down. This can be achieved using the property: acks=all.

In the aforesaid system, there is a possibility of duplicate records being pushed to the kafka brokers. Lets see how :-

Say, a message was being pushed from producer to cluster and that unfortunate producer failed to receive the acknowledgement back from the cluster because of network issue, it shall retry automatically to send that same message. Example: Say I/O thread transfers a message to the broker, the broker receives the data well and stores it into the partition log, broker now tries to send an acknowledgement for this message, and response doesn’t reaches back to the I/O thread well in time due to the network error. In this case, producer I/O thread shall wait for some more time and then resends back the message to the broker assuming the failure. Now, the broker again receives the data and it doesn’t have any means of identifying whether it’s the duplicate data or not, therefore it just again saves that duplicate message back to its partition log and again sends the response back to the I/O thread.

This retry mechanism shall continue till the producer I/O thread receives the success acknowledgement. Hence, it wants to make sure that message should be received at the broker at-least once.

Kafka-producer At-Most once semantics :-

We can set the property “retries=0” in order to achieve the at-most once semantics. With this approach, we are very well expected to loose some messages, but no message shall be delivered for more than once.

Kafka-producer Exactly once semantics / Idempotent :-

We can achieve the producer level exactly-once implementation by setting “enable.idempotence” property as true. This can very well protect us from aforementioned problems, but cant help us to protect from the application level retries or duplicates.

Transactional Kafka-producer semantics / Atomicity :- With this mode, say we wanted to push ’n’ no of records to kafka, then either all ’n’ records shall be pushed or no record at all shall be pushed.

We need to mandatorily set the property in producer i.e. ‘TRANSACTIONAL_ID_CONFIG’. This is a mandatory requirement.

When we set this transactional_id_config property, idempotency is automatically enabled because transactions are dependent upon idempotency.

The transactional_id_config property should be unique for each producer instance i.e. we can’t run 2 or more instances of a producer with same transactional_id. If we do so, one of those transaction shall be aborted, because 2 instances of same transaction are illegal.

The primary purpose of the transactional_id is to roll back the older unfinished transactions for the same transactional_id in case of producer application bounces or restarts.

Therefore, to achieve horizontal scalability at producer end (i.e. to have multiple concurrent kafka producers running at producer end), each producer instance can set the different transactional_id for itself. Hence, all the transactions emerging from the different producers shall be bearing different transaction_id at all.

For implementing transactions with kafka, following steps needs to be set right :-

a.) Initialise the transaction by calling init transactions. Its done by calling producer.initTransactions(). Its makes sure that, any other transaction initiated by the previous instances of the same producer are closed i.e. if application instance dies, then next instance can be guaranteed that, either earlier transactions finished completely or aborted leaving the new instance in a clean and neat state. This also retrieves the internal producer-id that shall be used in all future messages sent by the producer.

The producer-id is used by the broker to implement idempotence.

b.) Next step is to wrap all the send calls from kafka within producer.beginTransaction() and producer.commitTransaction() block. In case we receive some exception(while sending to kafka) from which we cant recover, we simply abort the transaction by invoking producer.abortTransaction() and close the producer. The commitTransaction() will flush any unsent records before committing the transaction. If any of the send call fails with ir-recoverable exception, the commitTransaction() shall throw the exception and you are supposed to rollback the whole transaction. Thats a reasonable ask as well. Hence we are in position to achieve transactional property of Either-ALL OR NOTHING.

In case of multi-threaded system, we shall call the send API from the multiple threads, however we must start the transactions before spawning those threads and either abort or commit when all threads are done.

c.) Last point to note is: same producer instance can not have multiple open transactions. We must either commit or abort the ongoing transaction, before we begin with a newer transaction.

Kafka-producer Synchronous send :-

In above way, the producer shall be waiting for an acknowledgement after sending the data to the broker. The “get” method on “send” method is a synchronous and blocking which shall wait for an acknowledgement.The “get” method shall throw an exception in case of failure OR it shall return the metadata in case of success.

There is a great implication on the speed and performance using the synchronous send approach.

Kafka-producer callback methods :-

To handle high throughput and also to know about the failed messages from the kafka’s send method, we can supply the producer callbacks methods which shall be invoked once the acknowledgement comes back from the I/O thread. Thus, the I/O thread shall call the callbackback method, once it itself receives the acknowledgement back from the broker.

Kafka consumer APIs :-

Problem #1. Say a consumer was a part of consumer-group and this group had 10 consumers in it originally. Each consumer was handling 1 partition in this original setup. Say, 1 of the consumer died suddenly, then the partition handled by this consumer shall be auto-transferred to some of the other 9 consumers. The records already processed by the consumer(which recently died) should & must NOT be processed again by the another consumer picking this partition.

Solution is to use Kafka Committed Offset.

As a concept, the offsets are committed, only when the next poll is being done by the kafka consumer, because only at the next poll, kafka broker comes to know that, previous messages (being polled by this kafka consumer) have been processed succesfully.

There is yet another problem with using the committed offsets. Say, a consumer processed messages succesfully(say transferred the funds) and then just crashed, without polling the next. So, for those many records(being consumed by this crashing consumer) the offset have yet not been committed and there are 100% chances of duplicate processing for these events.

Solution to this problem is using the Transactions with kafka and Manual offset management. Taking control of the offset committing from the kafka is the way to address this.

Use-case-2 :- This looks hard to achieve using consumer APIs :-

Say, we receive all transactional records (each petrol-pump wise) into a kafka topic and we have to compute the net-sales. Lets see what’s the solution we have from consumer’s point of view :-

Note: The format of the message-record being pushed to the kafka is :-

private String customerName;

private String customerAddress;

private Integer petrolPumpId;

private String typeOfFuelPurchased;

private String fuelquantityPurchased;

private Long netSales;

private Date timeStamp;

We should maintain this in-memory-map of :- <petrolPumpId, sales>.

HashMap<Integer, Long> storeIdToSales = new HashMap<petrolPumpId, NetTotalSales>

Now, whenever we read each record, we fetch the petrolPumpId and netSales from that record and then sum it up with existing entry present into the map present in-memory.

The problem with this approach is: what if the consumer crashes, all the data is gone, as map is being maintained in-memory.

Another solution is: to maintain this aggregate information in some data-store like aerospike or Dynamo, but there is a lot of code that we shall have to write for achieving all of this.

That’s all in this blog and we would see you in next series. If you liked reading this blog, do clap on this page.

--

--

aditya goel
aditya goel

Written by aditya goel

Software Engineer for Big Data distributed systems

No responses yet