KSQL — Getting Started (Part 3/3)
KSQL (KSQLDB) is a streaming SQL engine built on top of Apache Kafka, designed to perform real-time data processing and analysis using SQL-like queries. It allows users to query, transform, and aggregate data flowing through Kafka topics in real-time without needing to write complex code.
In this blog, we are going to learn more on
* Partitioning Requirements
* Windowing
* User Defined Functions
Please go through this link before starting this
Key Features of KSQL:
Partitioning Requirements:
- Tables are always partitioned by their PRIMARY KEY, and ksqlDB doesn’t allow repartitioning of tables, meaning you can only use a table’s primary key as a join column.
- Streams don’t have primary keys, but they do have an optional KEY column. A KEY column, when present, defines the partitioning column.
- Streams allow joins on expressions other than their key column. When the join criteria differ from the KEY column, ksqlDB internally repartitions the stream, which implicitly defines the correct key and partitioning.
Co Partitioning Requirements
When using ksqlDB to join streaming data, it’s essential to ensure that your streams and tables are co-partitioned (except for foreign-key table-table joins). Co-partitioning means that the input records on both sides of the join must have matching partition configurations.
Here are the requirements for co-partitioning:
- The input records on both sides must have the same key schema.
- The number of partitions must be identical for both sides of the join.
- Both sides must use the same partitioning strategy.
For ksqlDB to effectively join two data sources — whether streams or tables — it compares records based on the join column. To ensure that records with the same join column are processed together, this column must match the one used for partitioning in the sources.
When your inputs are correctly co-partitioned, records with the same key from both sides of the join are routed to the same stream task during processing, ensuring accurate and efficient join operations.
Records Have the Same Schema
- For a join to work, the keys from both sides must have the same SQL type.
- If the schema of the columns you wish to join on don’t match, it may be possible to CAST one side to match the other.
Table Keys
- Tables created on top of existing Kafka topics, for example those created with a CREATE TABLE statement, are keyed on the data held in the key of the records in the Kafka topic. ksqlDB presents this data in the PRIMARY KEY column.
- Tables created inside ksqlDB from other sources, for example those created with a CREATE TABLE AS SELECT statement, will copy the key from their source(s) unless there is an explicit GROUP BY or JOIN clause, which can change what the table is keyed on.
Stream Keys
- Specify key while creating a stream.
- To repartition a stream, use the PARTITION BY clause
- Be aware that Kafka guarantees the relative order of any two messages from one source partition only if they are also both in the same partition after the repartition. Otherwise, Kafka is likely to interleave messages. The use case will determine if these ordering guarantees are acceptable.
Records Have the Same Number of Partitions
For joins in ksqlDB, the input records must have the same number of partitions on both sides, except in the case of foreign-key table-table joins.
ksqlDB enforces this co-partitioning requirement by rejecting any join where the partition counts differ.
To verify the partition count, you can use the DESCRIBE <source name> EXTENDED
command in the CLI to identify the Kafka topic associated with a source, and the SHOW TOPICS
command to list topics along with their partition counts.
If the partitions differ between the sides of a join, you might need to adjust the partition counts of the source topics or repartition one side to match the other.
Records Have the Same Partitioning Strategy
For a successful join in ksqlDB, the records on both sides must follow the same partitioning strategy. This ensures that records with the same key are sent to the same partition.
In practical terms, this means that the input records for both sides of the join must be in the same partition. For example, in a stream-table join, if a userId
key with the value alice123
is in Partition 1 for the stream but in Partition 2 for the table, the join won't match, even though both sides are keyed by userId
.
ksqlDB doesn’t automatically check if the partitioning strategies align for both inputs, so it’s your responsibility to ensure they match.
Window:
- Representing time consistently enables aggregation operations on streams and tables, like SUM, that have distinct time boundaries. In ksqlDB, these boundaries are named windows.
- A window has a start time and an end time, which you access in your queries by using the WINDOWSTART and WINDOWEND system columns.
Tumbling Window
- A tumbling window is defined by a single property: the window’s duration.
- The tumbling window’s start time is inclusive, but the end time is exclusive
Hopping Window
- A hopping window is defined by two properties: the window’s duration and its advance, or “hop”, interval.
- The hopping window’s start time is inclusive,but the end time is exclusive.
Session Window
- A session window aggregates records into a session, which represents a period of activity separated by a specified gap of inactivity, or “idleness”.
- Any records with timestamps that occur within the inactivity gap of existing sessions are merged into the existing sessions. If a record’s timestamp occurs outside of the session gap, a new session is created.
- A new session window starts if the last record that arrived is further back in time than the specified inactivity gap.
- Session windows are different from the other window types, because:
- ksqlDB tracks all session windows independently across keys, so windows of different keys typically have different start and end times.
- Session window durations vary. Even windows for the same key typically have different durations.
User Defined Functions
KSQL has a programming API for building your own functions
- User Defined Functions (UDF) — A scalar function — for an input parameter return one output value
- User Defined Aggregate Functions (UDAF) — An aggregate function — for many input rows return one output value
- Implemented as custom jars
- Jars copied to the “ext” directory of the KSQL server
Once copied, use “List functions” command in ksql command line. the copied UDF/UDAF will be available there
To view the UDF
And you can query it
select user
, round(dist) as dist
, weather_description
, round(TAXI_WAIT(weather_description, dist)) as taxi_wait_min
from ridetodest emit changes;
Cool, You got the basics covered for KSQL
KSQL is a powerful tool for real-time data processing on Kafka streams, offering strong support for partitioning, time-based windowing, and extendability through user-defined functions. These features make it highly suitable for building and deploying real-time applications that require complex data transformations, aggregations, and analysis.