KSQL — Getting Started (Part 1/3)

Rasiksuhail
4 min readJan 31, 2024

--

Photo by Francisco on Unsplash

KSQL is an open-source streaming SQL engine for Apache Kafka. It allows developers and data engineers to process and analyze data streams in real-time using familiar SQL syntax. KSQL is part of the Confluent Platform, a set of tools and services built around Apache Kafka.

KSQL provides a SQL interface to work with Kafka topics, enabling users to express stream processing operations using SQL queries. It simplifies the development of stream processing applications by abstracting away the complexity of low-level Kafka Streams APIs.

Key Features

  • Declarative Processing: Define stream processing logic using SQL statements.
  • Real-Time Insights: Analyze and transform data as it flows through Kafka topics in real-time.
  • Stateful Operations: KSQL supports stateful operations, allowing you to maintain and update state during stream processing.
  • Interoperability: Easily integrate KSQL with other Kafka ecosystem tools and applications.

Basic Concepts

  • Streams: In KSQL, a stream represents an unbounded sequence of data records. Streams can be processed and transformed using SQL queries.
  • Tables: Tables are similar to streams but represent a bounded set of data, where each record has a unique key.
  • Queries: KSQL queries are written in SQL and operate on streams or tables.

Main components:

  • ksqlDB engine — processes SQL statements and queries
  • REST interface — enables client access to the engine
  • ksqlDB CLI — console that provides a command-line interface (CLI) to the engine
  • ksqlDB UI — enables developing ksqlDB applications in Confluent Control Center and Confluent Cloud

Events:

  • An event is anything that occurred and was recorded
    * High level — sale of an item
    * Low level — new entry in web server log
  • Ksqldb’s Core Unit of data
  • ksqlDB represents events by using a simple key/value model
    * The key represents some form of identity about the event
    * The value represents information about the event that occurred
    *The combination of key and value makes it easy to model stored events, since multiple events with the same key represent the same identity, irrespective of their values
  • Event is a row (relational db)
  • Supports 3 pseudo columns
    * ROWTIME
    * ROWPARTITION
    * ROWOFFSET
  • Supports 2 more pseudo columns in case of windowed sources
    * WINDOWSTART
    * WINDOWEND
  • Starting in ksqlDB 0.24, you can create columns that are populated by a record’s header field

Stream Processing:

  • Stream Processing — Creates programs that operate continually over unbounded streams of events
    * In ksqlDB, you manipulate events by deriving new collections from existing ones and describing the changes between them
    * When a collection is updated with a new event, ksqlDB updates the collections that are derived from it in real-time
  • Critical part of Streaming application
    * Transforming
    * Filtering
    * Joining
    * Aggregating

Derive a new stream from an existing stream

CREATE STREAM rock_songs (artist VARCHAR, title VARCHAR)
WITH (kafka_topic='rock_songs', partitions=2, value_format='avro');

CREATE STREAM title_cased_songs AS
SELECT artist, UCASE(title) AS capitalized_song
FROM rock_songs
EMIT CHANGES;

Deriving a new table from an existing stream

CREATE TABLE products (product_name VARCHAR PRIMARY KEY, cost DOUBLE)
WITH (kafka_topic='products', partitions=1, value_format='json');


CREATE STREAM orders (product_name VARCHAR KEY)
WITH (kafka_topic='orders', partitions=1, value_format='json');



CREATE TABLE order_metrics AS
SELECT p.product_name, COUNT(*) AS count, SUM(p.cost) AS revenue
FROM orders o JOIN products p ON p.product_name = o.product_name
GROUP BY p.product_name EMIT CHANGES;

Deriving a new table from an existing table

CREATE TABLE page_view_metrics AS
SELECT url, location_id, COUNT(*) AS count
FROM page_views GROUP BY url EMIT CHANGES;



CREATE TABLE page_view_metrics_mountain_view AS
SELECT url, count FROM page_view_metrics
WHERE location_id = 42 EMIT CHANGES;

Deriving a new stream from multiple streams

CREATE STREAM impressions (user VARCHAR KEY, impression_id BIGINT, url VARCHAR)
WITH (kafka_topic='impressions', partitions=1, value_format='json');

CREATE STREAM clicks (user VARCHAR KEY, url VARCHAR)
WITH (kafka_topic='clicks', partitions=1, value_format='json');



CREATE STREAM clicked_impressions AS
SELECT * FROM impressions i JOIN clicks c WITHIN 1 minute
ON i.user = c.user
WHERE i.url = c.url
EMIT CHANGES;

Materialized Views

  • Evaluates a query on the changes only (the delta), instead of evaluating the query on the entire table
    - When a new event is integrated, the current state of the view evolves into a new state
    - When a new event is integrated, the aggregation function that defines the view is applied only on this new event, leading to a new state for the view
    - A view is never “fully recomputed” when new events arrive
  • Queries against materialized views are highly efficient
  • Table created directly on top of kafka topic is not materialized
  • Table derived from another collection, ksqlDB materializes its results
-- Define a stream of order events
CREATE STREAM orders_stream (order_id INT, status VARCHAR)
WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');

-- Define a table to represent the latest order status for each order ID
CREATE TABLE latest_order_status AS
SELECT order_id, LATEST_BY_OFFSET(status) AS status
FROM orders_stream
GROUP BY order_id;

-- Define a materialized view to show the total count of orders for each status
CREATE TABLE order_status_count AS
SELECT status, COUNT(*) AS count
FROM orders_stream
GROUP BY status;

Comparison:

  • The latest_order_status table represents the latest state of each order ID in real-time.
  • The order_status_count materialized view represents the total count of orders for each status at a specific point in time.

In summary, tables are used for maintaining the latest state of data, while materialized views store the result of a query at a specific point in time, providing a way to cache and query precomputed results for improved performance.

More to be continued about KSQL in the next blogs.

Happy Learning !

--

--