KSQL-DB : Confluent Kafka Platform | Part2

  • Filtering streams of data.
  • Joining live streams of events with reference data (e.g. from a database).
  • Continuous, stateful aggregations.

= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
Copyright 2017-2020 Confluent Inc.CLI v5.5.0, Server v5.5.1 located at http://ksql-0-internal.user72.svc.cluster.local:8088Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!ksql>
  • We don’t need to know the format of the data when `print`ing a topic; KSQL introspects the data and understands how to deserialize it.
  • Kafka topic names are case-sensitive (“Ratings” and “ratings” are two different topics on a Kafka broker). All the KSQL constructs though, like Streams and Tables and everything else, are case-insensitive as you would expect from most database-like systems.
ksql> print ratings limit 3;
create stream ratings with (kafka_topic='ratings', value_format='avro');
Exploring “where” clause.
create stream poor_ratings as select * from ratings where stars < 3 and channel like 'iOS%';
describe extended POOR_RATINGS;
ksql> print POOR_RATINGS;
Output of underlying KAFKA topic → print POOR_RATINGS;
create stream customers_cdc with(kafka_topic='mysql.customer.customers-raw', value_format='AVRO');
No Output of Querying onto Stream (for which no data is arriving)
set 'auto.offset.reset' = 'earliest';
select after->first_name as first_name, after->last_name as last_name from customers_cdc emit changes;
create stream customers_flat with (partitions=1) as 
after->id as id,
after->first_name as first_name,
after->last_name as last_name,
after->email as email,
after->club_status as club_status,
after->comments as comments
from customers_cdc partition by after->id;
print CUSTOMERS_FLAT from beginning limit 3;
describe customers_flat;
select * from customers_flat emit changes;
create table customers(rowkey int key) with (kafka_topic='CUSTOMERS_FLAT', value_format='AVRO');
> update customers set first_name = 'Jay', last_name='Kreps' where id = 1;
create stream vip_poor_ratings as
select r.user_id, c.first_name, c.last_name, c.club_status, r.stars
from poor_ratings r
left join customers c
on r.user_id = c.rowkey
where lcase(c.club_status) = 'platinum';
show queries;
Inspecting all Consumer-Groups.
Inspecting any particular consumer-group for a Query.
select first_name, last_name, count(*) as rating_count
from vip_poor_ratings
window tumbling (size 5 minutes)
group by first_name, last_name
having count(*) > 1 emit changes;



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
aditya goel

aditya goel

Software Engineer for Big Data distributed systems