我们需要监听多个主题,并在每个主题的事件中寻找特定的字段。每个主题事件都是 json 格式,并且保证很少有 json 格式的固定字段。需要从所有这些多个主题中过滤事件,并在每个事件负载中查找特定字段。如果此字段值匹配某种格式,则将这些事件从不同主题发送到一个固定主题,该主题可以由另一个消费者进一步处理。
正在寻找 ksql 是否可以在这种情况下提供帮助 - 我们从多个主题创建一个流,并根据 ksql 流中的固定列过滤数据并将其推送到新主题。我的问题是:1)是否可以从多个主题创建一个 ksql 流?2) 是否可以将主题的完整事件有效负载作为 ksql 流中的一列?
在高层次上,(使用错误的 ksql 语法),我正在寻找类似的东西
CREATE STREAM my_all_topics (myFixedFiedl1 varchar, eventPayload varchar) WITH (value_format = 'json', kafka_topic_LIST='topic1, topic2, topic3');
CREATE STREAM mytopic_stream (myFixedFiedl1 varchar, eventPayload varchar) with (kafka_topic='my-final-topic-name', value_format='json')
as select myFixedField1, eventPayload from my_all_topics where myFixedField1 like 'myprefix%';