Real-time data pipelines with kafka :- Let’s first see how the Kafka is used in setting up real-time data-pipelines.
- First, we have our multiple data-sources and we want to onboard them to Kafka. We have 2 options here to go with i.e. Either Kafka producers OR Kafka source connectors.
- Next, we might have to perform some processing on the data being received in kafka-topics. We could use Streams processing application which shall consume from kafka topics, perform processing and again puts the data back into kafka topics.
- Next, After processing, we might want to dump the processed data back onto some another data-store. We can very well use Kafka sink connectors for the same.
Purpose of KSQL :- At times, writing Streams-application can become complex task, another option is to use KSQL. With KSQL, we are going to write the SQL like queries and its going to generate a kafka-streams-application in the background. Thus, Using KSQL, it would allow us to write an Kafka Streams application, but in much simpler way. Please note that, Streams are generated underneath. We can scale it too easily and in the same way, we do it for Streaming applications.
How does KSQL works :- Generally, we would have a KSQL CLI which would eventually communicate to the KSQL server, which shall generate Kafka-streams under the hoods.
Installing KSQL CLI :- We shall be using the Confluent-platform version 6.1 for this blog. Let’s first install the confluent CLI into our confluent-platform’s bin directory. Please note that, confluent CLI is for local development environment and not for production setup.
Let’s verify, whether confluent CLI has been installed or not :-
Next, lets first setup the home path for confluent-home directory into our bashrc file :-
Next, lets start the Ksql server at our local machine, using confluent platform CLI :-
Next, lets verify, whether confluent services got started or not at our local :-
Next, lets connect to KSQL-CLI and using CLI, we shall connect to ksql-server. Please note here that, our ksql-cli version is 6.1.1 and even the ksql-server version is also 6.1.1 :-
Exploring KSQL CLI :- Next, lets explore some of the important commands using ksql-cli. The first one important command is : ‘list topics’. This command would interrogate kafka brokers and list down all the topics we have in our kafka cluster.
Let’s create new topic called as ‘USERS’ using kafka-binaries provided to us by confluent software :-
Let’s again see, whether does newly created topic, reflects in ksql-cli or not ? Please note that, topic-name is always case-sensitive.
Next, we can also see any new data landing at the ‘USERS’ topic through ksql-cli using the following command :-
Let’s produce some data to this new topic and observe, whether incoming-data is really visible or not through CLI :-
Please note that, by default KSQL is going to show us only the latest data arriving into the topic and not the historical data. Let’s see entire(historical) data present into this topic, since beginning. Please also note that, command has not ended yet, rather its waiting for the new incoming data, after displaying all the historical data :-
Say, we want to see only few records from beginning and not everything, below is the command we can use for the same :-
Say, we want to see only few records from beginning, but not in sequential fashion, but rather every 4th alternate record, we can use below command :-
Introduction to Kafka Streams :- Streams are most fundamental to any Event-driven architecture. A Stream in Kafka is the full history from the start of time until now. Messages in a stream are constantly arriving and being added to the topic. Messages are independent to each other and forms a never ending sequence. Messages do arrive in time ordered manner in the stream. Messages in the streams don’t have any relation to one another. Following are some of the examples of the Streams :-
- Website click-stream.
- Orders arriving in a warehouse.
- Twitter represents stream of tweets in the real world.
Introducing Push Queries :- These queries constantly query and constantly outputs the results. Push queries continue to output the results until these are Either terminated by the user Or they exceed the Limit condition.
Until Ksql 5.3, they were used by-default. Following is an example of Push-query. Please note that, “Emit changes” clause is not required to be mentioned explicitly.
From KsqlDB 5.4 onwards, these have been made mandatory. The usage of ‘EMIT CHANGES’ clause, has been made mandatory.
Demonstration of Stream creation using KSQL CSV data:-
Create Clause :- While we create a stream, we also have to supply the underlying topic, from where the stream shall be reading the data and be formed. We also need to specify, how the data is encoded. In this example, since, through console, the data that we supplied above is of comma-separated format, we used the VALUE_FORMAT as ‘DELIMITED’.
List Clause :- Next, we can list down all the available streams through KSQL CLI :-
Select Clause :- Next, let’s see the data into this stream. Please note here that, by default KSQL would only show us the new/future data and not the already-present data.
Now, we manually push the data through Kafka console producer and again see the data in Streams :-
Select Clause :- In order to view the historical data from the stream, we can modify the auto.offset.reset to ‘earliest’ value. Please note that, this property change is applicable to only this window. In case we exit and enter again to KSQL-CLI, we might have to again set this property.
Aggregate Clause :- Let’s apply a basic aggregation / group-by upon the data present in this stream. We shall be grouping by the count of users by countryCode :-
Delete Clause :- Let’s now delete a stream. In below example, we also delete an underlying topic as well :-
Demonstration of Stream creation using KSQL JSON data:-
Let’s first create a kafka topic named ‘USER_PROFILE’ using kafka console.
Now, we can list this topic down very well from the ksql cli :-
Next, let’s produce some data through kafka console producer into the aforementioned topic.
Next, let’s create a stream from this topic. Please note here that, since the data present in the aforesaid topic is of type JSON, therefore the Value-format is also of type JSON.
Next, we can list down all the streams available into the ksql. We are able to see the stream that we just created above :-
Next, we can also see the structure of the aforesaid stream using ‘describe’ command :-
Next, since we pushed the messages onto the kafka topic, we can also see the data arriving at this stream :-
Say, we pushed this data to the kafka topic, data for some of the fields are not present :-
So, now, if we observe the attributes for which values were omitted, shall be coming as Null :-
Introducing KSQL DataGen Tool:- KSQL also provides the KSQL DataGen tool through the help of which the test-data can be generated. Through this tool, the data can be generated well in customised schema as well. We shall be generating the User_Profile type of data as we discussed above in this blog. Also, sample data mentioned below :-
With the help of this tool, we can also generate the data in various formats like CSV, JSON, AVRO, etc. We can also specify the RATE at which data should be produced. Also, we can specify the maximum number of records to be produced. Let’s use this tool.
First, we would have to specify the options from which the synthetic test data shall be generated. So, let’s first create a “userprofile.avro” file. This is also known as schema to generate the data :-
Next, we trigger the ksql-data generator utility and it now starts to generate the data, by referring to the aforesaid schema :-
We can also verify through ksql that, data is getting well received into the kafka-streams :-
Now, Please note that, following is the schema for our kafka-stream, as viewed from the ksql cli :-
KSQL introduced metadata :- Next, automatically metadata (some attributes like ‘ROWTIME’) is being added to each kafka stream, whenever it is created through ksql cli :-
KSQL built-in scalar functions :- The aforesaid data of ‘rowtime’ is not in human readable format. We can use the built-in scalar functions provided by the ksql :-
KSQL data concatenation and upper-case function :- The data of more than one attributes can be combined together into an another attribute by the use of “+” symbol. Also, we have used another function ‘ucase’ to transform the values to the upper-case.
KSQL creating new column, from existing columns :- We can build new attributes/columns from existing columns as well. Let’s take an example below. We are using the ‘case’ statements here. Basically, we have performed bucketing here. Please note here that, every-time we need not to write the ksql-queries directly to the cli, rather we can write the query to the file(which would prettify the query too) and then execute it like a script.
KSQL building stream from streams :- Let’s build another stream from the existing original stream. We have applied handful of functions (that we saw above) including concatenation, upperCase, case-statements, etc. to form something meaningful data from the original stream into a new stream. In another sense, we are building some logic on top of some existing logic (i.e. stream on top of another stream).
Usually, we can use some IDE to code this kind of script and then we can run this script through KSQL as below. This is a preferred practice to speed up the development cycles as well. We would also have new streams created thus finally :-
We can now peep into the newly created stream and observe the transformed data well present into this stream :-
Next, we can also view the details about the stream by describing this query. It would provide us the definition of the new stream. This also shows that total of 103 messages have been processed so far by this stream. ‘describe extended’ command also tells us about the rate at which messages are being received in the stream.
KSQL dropping streams :- Lets now drop the stream that we just created :-
Seems that, there already exists a write query on this stream, so lets first delete that query :-
Now, we we would be successfully deleting the stream :-
We can verify again that, the deleted stream doesn’t exists anymore :-
Introducing KSQL Tables :- The Kafka-Streams represents the concept of unending list of events onto a Kafka topic. Stream represents an unordered OR unending stream of events. The Kafka tables represents the state as of now. Whenever a new message arrives in the table :-
- It updates the previous message with same key.
- It adds a new message, when there is no message with the same key.
KSQL-Table is a stateful entity and helpful in situations, where we have to keep track of the latest message for each key. Please note that, ‘Key’ plays a very crucial role and hence required for any new incoming message/record into the Kafka topic. All of the below examples represents the use-cases, where we would prefer to use Table because in all of the scenarios, it indicates: whats the value NOW :-
- Stock-level:- How much is the current stock level for each item in Godown?
- Traffic:- How much is the web-traffic being seen in given time-period ?
- Weather:- Whats the current weather for the given city location ?
Demonstrating KSQL Tables :-
First, lets create a kafka topic, to house the countryCodes along with their names :-
Whether topic got created or not, can be verified through our ksql cli :-
Next, let’s go ahead and create a ksql table with a pre-defined underlying Kafka topic.
Whether table got created or not, can be verified through our ksql cli :-
Let’s go ahead and now investigate the definition of the given ksql table. The below output indicates that, ‘COUNTRYCODE’ is the primary-key of this table and ‘COUNTRYNAME’ is the value.
Here, the data that we have produced to the underpin kafka topic, shall not be able to reach to this table, because of one intentional problem. We shall see the solution later.
Next, let’s see whether data really got produced to the kafka topic, using ksql cli. Well it’s getting produced well :-
Debugging the issue with ksql :- Now, lets try to query the ksql table :-
But, we are unable to see any data in this ksql table. Let’s investigate the reason for the same. First, let’s find out where are the logs for the ksql server — run:-
Now, let’s goto this directory and investigate the logs-file:-
The log-line clearly shows that, in this kafka topic, the value field have got 2 columns (because according to the definition of the ksql-table, VALUE_FORMAT=’DELIMITED' means that value shall be delimited by delimiter and we have got one comma in the value), while in the ksql-table, we are only expecting 1 column for the value and hence no message is being reflecting on our ksql-table. Let’s produce a new message with only one column in the value. Please note that, there is no comma inside the value.
Now, again investigate whether we have got the record onto the ksql table or not. And Yeah, we are able to see the messages onto our ksql table now :-
Next, say we want to see only 1 record, we can use the ‘limit’ clause :-
Next, we want to filter out the record basis of the primary-key, same can be done very well with ‘where’ clause :-
Next, let’s see update anyone record and insert another fresh record :-
Afore-pushed data now starts reflecting into the Ksql table :-
Now, let’s query the record using the key, for which we updated the value. We observe that, only the latest value is being reflecting now. The old value (i.e. ‘South-Africa’) have been overridden with the new value (i.e. ‘Southern-America) for the key:SA.
In the next part, we shall look at some more advanced concepts around KSQL-DB.