0

我们需要监听多个主题,并在每个主题的事件中寻找特定的字段。每个主题事件都是 json 格式,并且保证很少有 json 格式的固定字段。需要从所有这些多个主题中过滤事件,并在每个事件负载中查找特定字段。如果此字段值匹配某种格式,则将这些事件从不同主题发送到一个固定主题,该主题可以由另一个消费者进一步处理。

正在寻找 ksql 是否可以在这种情况下提供帮助 - 我们从多个主题创建一个流,并根据 ksql 流中的固定列过滤数据并将其推送到新主题。我的问题是:1)是否可以从多个主题创建一个 ksql 流?2) 是否可以将主题的完整事件有效负载作为 ksql 流中的一列?

在高层次上,(使用错误的 ksql 语法),我正在寻找类似的东西

CREATE STREAM my_all_topics (myFixedFiedl1 varchar, eventPayload varchar) WITH (value_format = 'json', kafka_topic_LIST='topic1, topic2, topic3');

CREATE STREAM mytopic_stream (myFixedFiedl1 varchar, eventPayload varchar) with (kafka_topic='my-final-topic-name', value_format='json')
as select myFixedField1, eventPayload from my_all_topics where myFixedField1 like 'myprefix%';
4

2 回答 2

1

你不能完全按照你的意愿去做——KSQLSTREAM来源于一个且只有一个 Kafka 主题。

但是您可以使用 KSQL 的INSERT INTO功能来实现您想要的。

  1. 为您的源主题建模:

    CREATE STREAM source_a (myFixedField1 varchar, eventPayload varchar) WITH (kafka_topic='topic_a', value_format='json')
    CREATE STREAM source_b (myFixedField1 varchar, eventPayload varchar) WITH (kafka_topic='topic_b', value_format='json')
    CREATE STREAM source_c (myFixedField1 varchar, eventPayload varchar) WITH (kafka_topic='topic_c', value_format='json')
    
  2. 根据第一个源主题创建目标主题:

    CREATE STREAM mytopic_stream (myFixedField1 varchar, eventPayload varchar) AS SELECT myFixedField1, eventPayload from source_a where myFixedField1 like 'myprefix%';
    
  3. 指定从剩余的源主题插入到目标主题:

    INSERT INTO mytopic_stream SELECT myFixedField1, eventPayload from source_b where myFixedField1 like 'myprefix%';
    INSERT INTO mytopic_stream SELECT myFixedField1, eventPayload from source_c where myFixedField1 like 'myprefix%';
    

也可以看看

于 2018-10-31T03:03:55.350 回答
0

我不确定,但似乎您可以将流与JOIN.

CREATE STREAM mytopic_stream AS
    SELECT A.*, B.*, C.*
    FROM stream_A A
        JOIN stream_B B ON A.key = B.key_for_A
        JOIN stream_C C ON A.key = B.key_for_A

如果您尚未使用 KSQL 注册 Kafka 主题,那么您必须首先处理该步骤。

于 2019-04-05T21:24:02.240 回答