KSQL-DB : Confluent Kafka Platform | Part2

If you are directly landing here, it would be highly recommended that you have a read about this link here.

What is KSQL ? KSQL is the streaming SQL engine for Apache Kafka. This blog will step through some practical examples of how to use KSQL to build powerful stream-processing applications:

  • Filtering streams of data.
  • Joining live streams of events with reference data (e.g. from a database).
  • Continuous, stateful aggregations.

Introduction to the use-case :-We have an Airline, for which we need to do real-time analysis for the reviews received on-flight :-

Below are details about our use-case, which we shall be addressing in this blog :-

Here is the approach, we shall be adopting in order to analyse the reviews in real-time and then setup alerts :-

Introduction to KSQL :- KSQL can be accessed via either the command line interface (CLI), a graphical UI built into Confluent Control Center, or the documented REST API. For this blog, we shall be using the CLI. If you have used tools for MySQL, Postgres or Oracle’s sql*plus before this should feel very familiar. Let’s switch back to our terminal session and fire it up. This will connect to your personal KSQL Server.


= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
Copyright 2017-2020 Confluent Inc.CLI v5.5.0, Server v5.5.1 located at http://ksql-0-internal.user72.svc.cluster.local:8088Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!ksql>

Step 1.) See available Kafka topics and data : KSQL can be used to view the topic metadata on a Kafka cluster — try

show topics;

Notice the two important points :-

  • We don’t need to know the format of the data when `print`ing a topic; KSQL introspects the data and understands how to deserialize it.
  • Kafka topic names are case-sensitive (“Ratings” and “ratings” are two different topics on a Kafka broker). All the KSQL constructs though, like Streams and Tables and everything else, are case-insensitive as you would expect from most database-like systems.

Step 2.) We can also investigate some data in the aforementioned topic. Please note that, the data to this kafka topic, is being published by our spring-boot-microservice. Also note that, below way, only the topic can be printed and not the streams.

ksql> print ratings limit 3;

For example, one event can be read as :- Customer with user_id as 18, gave a 3-star-rating with rating_id as 1100991 for route_id as 1831 at rating_time as 1331881 through channel ios and this user submitted the message as : “thank you for the most friendly, helpful experience today at your new lounge.”

The event stream, driving this example is a simulated stream of events representing the ratings left by users on a mobile app or website, with fields including the device type that they used, the star rating (a score from 1 to 5), and an optional comment associated with the rating.

Step 3.) To make use of our ratings and customers topics in KSQL we first need to define some Streams and/or Tables over them. Register the RATINGS data as a KSQL stream, sourced from the ‘ratings’ topic.

create stream ratings with (kafka_topic='ratings', value_format='avro');

Notice, that here we are using the Schema-Registry with our Avro-formatted data to pull in the schema of this stream automatically. If our data were in some other format which can’t be described in the Schema Registry, such as CSV messages, then we would also need to specify each column and it’ s datatype in the create statement.

Step 4.) Let’s see, whether our create stream command is succesfull or not ?

show streams;

Step 5.) Let’s examine our stream with following command :- describe ratings;

Step 6.) Let’s examine our stream with following command :- describe extended ratings;

Step 7.) Let’s now query to the stream(data that we are constantly receiving into this Stream (technically into its underlying kafka topic from micro-service)) :-

Step 8.) Let’s try our hands with where clause on the streams. We would see all those entries into this stream “ratings” for which star-rating being received is less than 3 and the users who gave these ratings used the channels ios. Note that, entries here are continuously being produced by backend micro-service.

Exploring “where” clause.

Step 9.) Persistent Queries :- Let’s now convert this test query into a persistent one. A persistent query is one which starts with create and continuously writes its' output into a topic in Kafka.

create stream poor_ratings as select * from ratings where stars < 3 and channel like 'iOS%';
describe extended POOR_RATINGS;

We can see here that, on an average, 340 new messages are arriving at this stream. So far, we have received 33,712 messages in total.

Also, note here that, there is always a underlying kafka topic, which gets created automatically for the any new stream that we create. Here also, we have got the backing created :-

ksql> print POOR_RATINGS;
Output of underlying KAFKA topic → print POOR_RATINGS;

Step 10.) Let’s now investigate the customer-data that we have in Mysql database which looks as follows :-

Step 11.) Next, we shall be bringing in Customer-lookup-data from MySQL using MySQL CDC data-feed into the KSQL-DB. Defining a lookup table for Customer data from our is a multi-step process:

create stream customers_cdc with(kafka_topic='mysql.customer.customers-raw', value_format='AVRO');

Step 12.) Let’s now inspect structure of this stream now :-

Note the structure of event as received from CDC, looks as we have BEFORE & AFTER fields along with ROWKEY & ROWTIME.

Step 13.) Now, what happens when we query from this new stream ? Since, we aren’t pushing new records into this stream (technically into it’s backing topic) by changing data in MySQL then we won’t see any query output.

No Output of Querying onto Stream (for which no data is arriving)

What we are seeing — or rather, not seeing! — here in KSQL is actually the normal behavior of any Kafka client application (and remember, that’s exactly what a KSQL query is!). By default, all Kafka client applications when they start up will consume messages which arrive in their input topics from that moment forwards. Older records in the topic are not consumed. We can control this behavior though by setting a configuration property, called ‘auto.offset.reset’. In KSQL, the various configuration settings for our SQL apps can be inspected by typing show properties in the KSQL CLI. if you try that you can see a whole long list of technical defaults, almost all of which we can safely ignore :-) We adjust the property which controls where new queries start reading from like this:

set 'auto.offset.reset' = 'earliest';

Now all the subsequent queries we issue will pick up this setting and consume all the records in a topic, including those produced in the past. This setting will stay in effect for the rest of your KSQL session.

Now, In one sense, this is the historical data output from the “customers_cdc” stream, as we have modified this property set 'auto.offset.reset’. We have basically received 20 records back(representing the original inserts into MySQL). Here, one event can be read as :- Customer with id as 1, was inserted into the underlying Mysql database table at ROWTIME as 1631612896846. At that time, since this record was non-existent into the database, it’s BEFORE construct is showing as NULL & AFTER construct is showing, whatever data was inserted at that moment. The eventType OP for this event was C i.e. it was a create type of event.

Step 14.) Let’s now perform “struct-dereferencing” with the “→” operator.

select after->first_name as first_name, after->last_name as last_name from customers_cdc emit changes;

Observe that the message structure here is actually quite complex and nested. Remember, view of that by describe customers_cdc;We should see that there’s a bunch of metadata about the actual change in MySQL (timestamp, transaction id, etc) and then a 'Before' and 'After' image of the changed row.

Note that, since we had earlier modified this property set 'auto.offset.reset’, we basically received 20 records back(representing the original inserts into MySQL). Since, there is no new / fresh data landing, hence output like that above.

Step 15.) For our use-case, we want to extract just the changed record values from the CDC structures, re-partition on the ID column, and set the target topic to have the same number of partitions as the source ratings topic:

create stream customers_flat with (partitions=1) as 
after->id as id,
after->first_name as first_name,
after->last_name as last_name,
after->email as email,
after->club_status as club_status,
after->comments as comments
from customers_cdc partition by after->id;

NOTE: Please note that, whenever we create a stream from another stream like this, basically it also automatically creates a backing kafka topic with same name.

print CUSTOMERS_FLAT from beginning limit 3;

Step 16.) Let’s now inspect & investigate the data into the afore-formed new stream & underlying Kafka-topic :-

describe customers_flat;
select * from customers_flat emit changes;

Step 17.) Let’s now register the CUSTOMER data as a KSQL table, sourced from this new, re-partitioned, topic

create table customers(rowkey int key) with (kafka_topic='CUSTOMERS_FLAT', value_format='AVRO');

So now we have a “pipeline” of queries to read the CDC data, reformat it, and push it into a KSQL table we can use to do lookups against. Let’s check our table at this point:select * from customers emit changes;

Step 18.) Let’s now test this new pipeline. In a new terminal window, side-by-side with the one we are already using, connect to the server again and launch the MySQL client and let’s update an existing record.

> update customers set first_name = 'Jay', last_name='Kreps' where id = 1;

Assuming that, we leave our KSQL select * from customers; KSQL query on the KTable, running in the first window, watch what happens as we change data in the MySQL source. The changed record lands immediately onto our KSQL table.

Step 19.) Let’s now identify the unhappy customers :- Now that, we have both our POOR_RATINGS stream and our continuously-updating table of customer data, we can join them together to find out details about the customers who are posting negative reviews, and see if any of them are our valued elite customers.

create stream vip_poor_ratings as
select r.user_id, c.first_name, c.last_name, c.club_status, r.stars
from poor_ratings r
left join customers c
on r.user_id = c.rowkey
where lcase(c.club_status) = 'platinum';

Let’s now inspect this newly launched stream :-

There is also an underlying kafka topic for every stream. Above one is a persistent stream and for this stream, here is how the underlying kafka topic looks :-

Step 20.) Let’s see, you think would happen if you went and changed the club_status of a customer while this join query is running ?

Step 21.) Let’s go ahead now and inspect all of our queries running :-

show queries;

Step 21.) Let’s now view Consumer Lag for our Queries :- Over in the Confluent control Center browser window, navigate to ‘Consumers’ and, in the table of consumer groups, try to find the one for our join query and click on it. All the names of consumer-groups are prefixed with ‘_confluent-ksql-<user-id>.ksql_query_’ plus the ID of the query, as shown in the output of show queries.

Inspecting all Consumer-Groups.
Inspecting any particular consumer-group for a Query.

Step 22.) Let’s explore the following ideas: which customers are so upset that they post multiple bad ratings in quick succession ? Perhaps we want to route those complaints direct to our Customer Care team to do some outreach.

select first_name, last_name, count(*) as rating_count
from vip_poor_ratings
window tumbling (size 5 minutes)
group by first_name, last_name
having count(*) > 1 emit changes;

This may take a minute or two to return any data as we are now waiting for the random data generator(our backend microservice in real-world) which populates the orginal ‘ratings’ to produce the needed set of output.

Also, we could prefix this query with create table very_unhappy_vips as to continuously record the output :-

Thanks for reading through this blog. We shall see you in next part of this series.

References :-