How to delete records from ksqlDB materialized view

Ondrej Kvasnovsky
4 min readJan 25, 2023

--

We created a materialized view in Materialized view using ksqlDB article. The materialized view we made represents the latest values out of all the events stored in a topic. But what if we want to create a materialized view that also propagates deletes using ksqlDB?

Deleting records in Kafka topic

To remove a record from the Kafka topic, we need to execute an INSERT statement with a NULL value. The NULL value records are called tombstones in Kafka.

Deleting records in ksqlDB materialized view

We can only insert NULL value into a stream that has VALUE_FORMAT set to KAFKA.

There is a feature request to implement some kind of DELETE statement that would insert NULL value into a stream. See and up vote the following feature if you agree with the following feature request: https://github.com/confluentinc/ksql/issues/7073

First, let’s set up a proper software engineering mindset. In software engineering, every decision involves trade-offs, there are no perfect solutions. That means, there are no solutions, only trade-offs.

We are going to create additional streams for existing and deleted records. Then we create a new topic that will contain the final state where we represent the deleted records as records with NULL values. The last step will be to create a source table, the materialized view, out of the final state company_latest topic.

The companies_deleted stream needs to be created with VALUE_FORMAT set to KAFKA, because all the other value types do not support NULL as a value.

The following section contains the statements to do all that we mentioned above.

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM companies (
id VARCHAR KEY,
name VARCHAR,
revenue DOUBLE,
deleted BOOLEAN
) WITH (
kafka_topic = 'companies',
partitions = 2,
value_format = 'AVRO'
);

CREATE STREAM companies_existing
WITH (
kafka_topic = 'companies_latest',
partitions = 2,
value_format = 'AVRO'
) AS
SELECT *
FROM companies
WHERE deleted = FALSE;

CREATE STREAM companies_deleted
WITH (
kafka_topic = 'companies_latest',
partitions = 2,
value_format = 'KAFKA'
) AS
SELECT ID, CAST(NULL AS VARCHAR)
FROM companies
WHERE deleted = TRUE;

CREATE SOURCE TABLE companies_latest (
id VARCHAR PRIMARY KEY,
name VARCHAR,
revenue DOUBLE
) WITH (
kafka_topic = 'companies_latest',
partitions = 2,
value_format = 'AVRO'
);

CAST(NULL AS VARCHAR) is necessary since ksqlDB needs a datatype even if it is NULL.

Now we can insert the test data and observe the changes.

INSERT INTO companies (id, name, revenue, deleted) 
VALUES ('AMZ', 'Amazon', 100, false);

INSERT INTO companies (id, name, revenue, deleted)
VALUES ('AMZ', 'Amazon', 450, false);

INSERT INTO companies (id, name, revenue, deleted)
VALUES ('GOG', 'Google', 90, false);

INSERT INTO companies (id, name, revenue, deleted)
VALUES ('APL', 'Apple', 130, false);

INSERT INTO companies (id, name, revenue, deleted)
VALUES ('GOG', 'Google', 99, false);

INSERT INTO companies (id, name, revenue, deleted)
VALUES ('APL', 'Apple', 139, false);

Query the data and observe the data being populated in the companies_latest materialized view.

SELECT * FROM companies_latest;

Let’s remove Apple company.

INSERT INTO companies (id, deleted) VALUES ('APL', true);

Execute the SELECT query again to display the latest state of the materialized view.

Here is the log from company_latest topic showing the tombstone, as the last item in the table.

ksql> print company_latest;
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2023/01/25 18:12:25.355 Z, key: AMZ, value: {"NAME": "Amazon", "REVENUE": 100.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.422 Z, key: AMZ, value: {"NAME": "Amazon", "REVENUE": 450.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.502 Z, key: GOG, value: {"NAME": "Google", "REVENUE": 90.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.530 Z, key: APL, value: {"NAME": "Apple", "REVENUE": 130.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.562 Z, key: GOG, value: {"NAME": "Google", "REVENUE": 99.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.621 Z, key: APL, value: {"NAME": "Apple", "REVENUE": 139.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:30.311 Z, key: APL, value: {"NAME": null, "REVENUE": null, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:16:20.989 Z, key: APL, value: <null>, partition: 0

Cleanup streams and tables for experimentations

Here is a script to DROP TABLE and DROP STREAM.

DROP TABLE IF EXISTS companies_latest;
DROP STREAM IF EXISTS companies_deleted;
DROP STREAM IF EXISTS companies_existing DELETE TOPIC;
DROP STREAM IF EXISTS companies DELETE TOPIC;

Resources and Materials

--

--