How to delete records from ksqlDB materialized view
We created a materialized view in Materialized view using ksqlDB article. The materialized view we made represents the latest values out of all the events stored in a topic. But what if we want to create a materialized view that also propagates deletes using ksqlDB?
Deleting records in Kafka topic
To remove a record from the Kafka topic, we need to execute an INSERT
statement with a NULL
value. The NULL
value records are called tombstones in Kafka.
Deleting records in ksqlDB materialized view
We can only insert NULL
value into a stream that has VALUE_FORMAT
set to KAFKA
.
There is a feature request to implement some kind of
DELETE
statement that would insertNULL
value into a stream. See and up vote the following feature if you agree with the following feature request: https://github.com/confluentinc/ksql/issues/7073
First, let’s set up a proper software engineering mindset. In software engineering, every decision involves trade-offs, there are no perfect solutions. That means, there are no solutions, only trade-offs.
We are going to create additional streams for existing
and deleted
records. Then we create a new topic that will contain the final state where we represent the deleted records as records with NULL
values. The last step will be to create a source table, the materialized view, out of the final state company_latest
topic.
The
companies_deleted
stream needs to be created withVALUE_FORMAT
set toKAFKA
, because all the other value types do not supportNULL
as a value.
The following section contains the statements to do all that we mentioned above.
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM companies (
id VARCHAR KEY,
name VARCHAR,
revenue DOUBLE,
deleted BOOLEAN
) WITH (
kafka_topic = 'companies',
partitions = 2,
value_format = 'AVRO'
);
CREATE STREAM companies_existing
WITH (
kafka_topic = 'companies_latest',
partitions = 2,
value_format = 'AVRO'
) AS
SELECT *
FROM companies
WHERE deleted = FALSE;
CREATE STREAM companies_deleted
WITH (
kafka_topic = 'companies_latest',
partitions = 2,
value_format = 'KAFKA'
) AS
SELECT ID, CAST(NULL AS VARCHAR)
FROM companies
WHERE deleted = TRUE;
CREATE SOURCE TABLE companies_latest (
id VARCHAR PRIMARY KEY,
name VARCHAR,
revenue DOUBLE
) WITH (
kafka_topic = 'companies_latest',
partitions = 2,
value_format = 'AVRO'
);
CAST(NULL AS VARCHAR)
is necessary since ksqlDB needs a datatype even if it isNULL
.
Now we can insert the test data and observe the changes.
INSERT INTO companies (id, name, revenue, deleted)
VALUES ('AMZ', 'Amazon', 100, false);
INSERT INTO companies (id, name, revenue, deleted)
VALUES ('AMZ', 'Amazon', 450, false);
INSERT INTO companies (id, name, revenue, deleted)
VALUES ('GOG', 'Google', 90, false);
INSERT INTO companies (id, name, revenue, deleted)
VALUES ('APL', 'Apple', 130, false);
INSERT INTO companies (id, name, revenue, deleted)
VALUES ('GOG', 'Google', 99, false);
INSERT INTO companies (id, name, revenue, deleted)
VALUES ('APL', 'Apple', 139, false);
Query the data and observe the data being populated in the companies_latest
materialized view.
SELECT * FROM companies_latest;
Let’s remove Apple company.
INSERT INTO companies (id, deleted) VALUES ('APL', true);
Execute the SELECT
query again to display the latest state of the materialized view.
Here is the log from company_latest
topic showing the tombstone, as the last item in the table.
ksql> print company_latest;
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2023/01/25 18:12:25.355 Z, key: AMZ, value: {"NAME": "Amazon", "REVENUE": 100.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.422 Z, key: AMZ, value: {"NAME": "Amazon", "REVENUE": 450.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.502 Z, key: GOG, value: {"NAME": "Google", "REVENUE": 90.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.530 Z, key: APL, value: {"NAME": "Apple", "REVENUE": 130.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.562 Z, key: GOG, value: {"NAME": "Google", "REVENUE": 99.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.621 Z, key: APL, value: {"NAME": "Apple", "REVENUE": 139.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:30.311 Z, key: APL, value: {"NAME": null, "REVENUE": null, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:16:20.989 Z, key: APL, value: <null>, partition: 0
Cleanup streams and tables for experimentations
Here is a script to DROP TABLE
and DROP STREAM
.
DROP TABLE IF EXISTS companies_latest;
DROP STREAM IF EXISTS companies_deleted;
DROP STREAM IF EXISTS companies_existing DELETE TOPIC;
DROP STREAM IF EXISTS companies DELETE TOPIC;
Resources and Materials
- https://medium.com/@damienthomlutz/deleting-records-in-kafka-aka-tombstones-651114655a16
- https://stackoverflow.com/questions/66305527/how-to-delete-a-value-from-ksqldb-table-or-insert-a-tombstone-value/66314510#66314510
- https://github.com/confluentinc/ksql/issues/7073
- https://rmoff.net/2020/11/03/kafka-connect-ksqldb-and-kafka-tombstone-messages/
- https://github.com/confluentinc/ksql/blob/22a79bc7695795494d4aaed8618bf3de252271a4/design-proposals/klip-49-add-source-stream-table-semantic.md
- https://forum.confluent.io/t/tombstone-messages-not-propagated/2612