按照文档中的示例: https ://clickhouse.yandex/docs/en/table_engines/kafka/
我使用 Kafka Engine 和一个将数据推送到MergeTree表的物化视图创建了一个表。
这是我的表的结构:
CREATE TABLE games (
UserId UInt32,
ActivityType UInt8,
Amount Float32,
CurrencyId UInt8,
Date String
) ENGINE = Kafka('XXXX.eu-west-1.compute.amazonaws.com:9092,XXXX.eu-west-1.compute.amazonaws.com:9092,XXXX.eu-west-1.compute.amazonaws.com:9092', 'games', 'click-1', 'JSONEachRow', '3');
CREATE TABLE tests.games_transactions (
day Date,
UserId UInt32,
Amount Float32,
CurrencyId UInt8,
timevalue DateTime,
ActivityType UInt8
) ENGINE = MergeTree(day, (day, UserId), 8192);
CREATE MATERIALIZED VIEW tests.games_consumer TO tests.games_transactions
AS SELECT toDate(replaceRegexpOne(Date,'\\..*','')) as day, UserId, Amount, CurrencyId, toDateTime(replaceRegexpOne(Date,'\\..*','')) as timevalue, ActivityType
FROM default.games;
在 Kafka 主题中,我每秒收到大约 150 条消息。
一切都很好,数据在表中更新的部分延迟很大,绝对不是实时的。
似乎只有当我达到65536 条准备在 Kafka 中消费的新消息时,数据才会从 Kafka 发送到表
我应该设置一些特定的配置吗?
我试图从 cli 更改配置:
SET max_insert_block_size=1048
SET max_block_size=655
SET stream_flush_interval_ms=750
但是没有任何改善
我应该更改任何特定配置吗?
我应该在创建表之前更改上述配置吗?