Skip to main content

数据导出

一些用户希望使用 RisingWave 做流计算,然后将计算后的结果导出到下游系统中去。这是常见的流式 ETL 场景。

在 RisingWave 中,用户可以直接使用 CREATE SINK 语句实现数据导出。

代码示例

我们现在快速验证一下 RisingWave 导出数据的能力。简单起见,这里我们选用 Apache Kafka 作为导出数据的下游。

准备需导出的数据

我们首先创建一个 table ,并使用 datagen 工具导入数据:

CREATE TABLE t1 (v1 int, v2 int)
WITH (
connector = 'datagen',

fields.v1.kind = 'sequence',
fields.v1.start = '1',

fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',

datagen.rows.per.second = '10'
) ROW FORMAT JSON;

我们来验证一下创建是否成功:

SHOW TABLES;

得到:

 Name
------
t1
(1 row)

在创建 t1 一段时间后,我们用 SELECT 语句查询一下 t1 :

SELECT count(*) FROM t1;

直接导出数据

首先我们在本地 localhost 启动 Apache Kafka 监听 9092 端口(Apache Kafka 的本地部署可以通过 Docker Compose 实现,具体步骤这里先略过), 接下来我们可以通过创建 sink 直接把 table 的数据导出到下游:

CREATE SINK test_sink
FROM t1
WITH (
properties.bootstrap.server = 'localhost:9092',
topic = 'test_sink_topic',
connector = 'kafka',
primary_key = 'v1'
)
FORMAT UPSERT ENCODE JSON;

这里我们指定了 FORMAT UPSERT ENCODE JSON 来表达 RisingWave 使用 UPSERT 的方式将 JSON 格式的消息输出到下游 Kafka。其中 primary_key 里指定了 v1 作为下游 Kafka 消息的 key。

验证一下 sink 是否创建成功:

SHOW SINKS;

得到结果(具体得到的数字可能完全不一样):

Name 
------
test_sink
(1 rows)

在 console 使用工具查询一下 kafka test_sink_topic 的内容:

> kafkacat -b localhost:9092 -C -t test_sink_topic -J                  

得到结果(数据会持续生产到kafka直到datagen结束):

{"topic":"test_sink_topic","partition":0,"offset":0,"tstype":"create","ts":1700201806289,"broker":-1,"key":"{\"v1\":1}","payload":"{\"v1\":1,\"v2\":7}"}
{"topic":"test_sink_topic","partition":0,"offset":1,"tstype":"create","ts":1700201806289,"broker":-1,"key":"{\"v1\":2}","payload":"{\"v1\":2,\"v2\":5}"}
...

当然 RisingWave 也支持将流计算后的结果导出到下游系统,我们可以通过 CREATE SINK FROM <materialized view> 或者 CREATE SINK AS <query> 来实现。

继续阅读