Materialized view using ksqlDB

Ondrej Kvasnovsky
3 min readJan 23, 2023

--

It is necessary to have Confluent Platform running on your local machine. If you need assistance with the installation process, you can refer to the guide “How to install Confluent Platform on local”.

Create stream for your data

The first step in creating a materialized view using ksqlDB is to create a stream for pushing data. This can be done by pasting the appropriate query into the command line interface (CLI) or by using the Confluent UI.

CREATE STREAM company_stream (
id VARCHAR KEY,
name VARCHAR,
revenue DOUBLE
) WITH (
kafka_topic = 'company',
partitions = 2,
value_format = 'json'
);

Materialized view

Once the stream is set up, we can move on to creating a materialized view. This will consume events from the stream and use them to construct and maintain an updated state.

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE company_latest AS
SELECT
id,
LATEST_BY_OFFSET(name) as name,
LATEST_BY_OFFSET(revenue) AS revenue
FROM company_stream
GROUP BY id
EMIT CHANGES;

Insert some data and observe the changes

To test our materialized view, we can insert some data into the company_stream and observe the changes in the materialized view. Before inserting data, let’s first preview the contents of the materialized view. This should return an empty result set, as we have not yet added any data.

SELECT * 
FROM companies_latest;

With the data inserted into the company_stream, we can now observe the changes in the materialized view. The data can be inserted row by row or all at once. As the data is added, we can see the materialized view table being updated to reflect the new state.

INSERT INTO company_stream (id, name, revenue) VALUES ('AMZ', 'Amazon', 100);

INSERT INTO company_stream (id, name, revenue) VALUES ('AMZ', 'Amazon', 450);

INSERT INTO company_stream (id, name, revenue) VALUES ('GOG', 'Google', 90);
INSERT INTO company_stream (id, name, revenue) VALUES ('APL', 'Apple', 130);

INSERT INTO company_stream (id, name, revenue) VALUES ('GOG', 'Google', 99);

INSERT INTO company_stream (id, name, revenue) VALUES ('APL', 'Apple', 139);

After inserting the data, we can again preview the contents of the materialized view. This time we should see the latest changes that have been made to the table, reflecting the new state of the data.

SELECT * 
FROM companies_latest;

So, if we need to insert a new company, we do INSERT. If we need to update the company, we do INSERT with the existing ID and new values.

Deleting records deserves its own article.

Cleanup data (for experimentations)

To clean up the data from both the stream and the materialized view, it is necessary to execute a DROP command first on the materialized view, and then on the stream. Additionally, it is important to include a DROP TOPIC command to remove the data from the both topics that were automatically created.

DROP TABLE IF EXISTS company_latest DELETE TOPIC;
DROP STREAM IF EXISTS company_stream DELETE TOPIC;

Resources

--

--

No responses yet