Data Delivery
Some users wish to use RisingWave for stream processing and then deliver the computed results to downstream systems. This is a common stream ETL scenario.
In RisingWave, users can directly use the CREATE SINK
statement to achieve data delivery.
Sample Code
Let's take a quick look at how RisingWave delivers data. For simplicity, we will use Apache Kafka as the downstream destination.
Preparing Data
First, we create a table
and import data using the datagen
tool:
CREATE TABLE t1 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
Let's verify if the creation was successful:
SHOW TABLES;
Output:
Name
------
t1
(1 row)
After creating t1
for some time, we can use the SELECT
statement to query t1
:
SELECT COUNT(*) FROM t1;
Delivering Data
First, we need to start Apache Kafka on localhost, listening on port 9092 (local deployment of Apache Kafka can be achieved using Docker Compose, specific steps are omitted here). Next, we can deliver the data from the table directly to the downstream by creating a sink
:
set streaming_parallelism = 1; # align with the Kafka's default partition number to prevent producer error
CREATE SINK test_sink
FROM t1
WITH (
properties.bootstrap.server = 'localhost:9092',
topic = 'test_sink_topic',
connector = 'kafka',
primary_key = 'v1'
)
FORMAT UPSERT ENCODE JSON;
Here, we specify FORMAT UPSERT ENCODE JSON
to indicate that RisingWave will use UPSERT to output JSON-formatted messages to Kafka. In the primary_key
, we specify v1
as the key for the downstream Kafka messages.
Let's check if the sink was created successfully:
SHOW SINKS;
Output (the numbers shown may be completely different):
Name
------
test_sink
(1 row)
Using the console, let's query the content of the Kafka topic test_sink_topic
:
> kafkacat -b localhost:9092 -C -t test_sink_topic -J
Output (data will continue to be sent to Kafka until datagen stops):
{"topic":"test_sink_topic","partition":0,"offset":0,"tstype":"create","ts":1700201806289,"broker":-1,"key":"{\"v1\":1}","payload":"{\"v1\":1,\"v2\":7}"}
{"topic":"test_sink_topic","partition":0,"offset":1,"tstype":"create","ts":1700201806289,"broker":-1,"key":"{\"v1\":2}","payload":"{\"v1\":2,\"v2\":5}"}
...
RisingWave also supports deliver the results of stream calculations to downstream systems. This can be achieved using either CREATE SINK FROM <materialized view>
or CREATE SINK AS <query>
.
Continue Reading
Connector - Sink: Learn more about the supported data formats, encoding formats, and data delivery methods.