0

首先,我们有一个 JSON 格式的 kafka 输入源:

{"event_time": "2020-08-23 18:36:10", "word": "apple", "cnt": 1}
{"event_time": "2020-08-23 18:36:20", "word": "banana", "cnt": 1}
{"event_time": "2020-08-23 18:37:30", "word": "apple", "cnt": 2}
{"event_time": "2020-08-23 18:37:40", "word": "apple", "cnt": 1}
... ...

我要做的是每分钟汇总每个单词的计数总和:

+---------+----------+---------------------+
| word    | SUM(cnt) |   window_start      |
+---------+----------+---------------------+
| apple   |    1     | 2020-08-23 18:36:00 |
+---------+----------+---------------------+
| banana  |    1     | 2020-08-23 18:36:00 |
+---------+----------+---------------------+
| apple   |    3     | 2020-08-23 18:37:00 |
+---------+----------+---------------------+

因此,这种情况非常适合以下 Beam SQL 语句:

SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start
FROM t_count_stats
GROUP BY word, TUMBLE(event_time, INTERVAL '1' MINUTE)

以下是我当前使用 Beam 的 Java SDK 执行此流式 SQL 查询的工作代码:

import avro.shaded.com.google.common.collect.ImmutableMap;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.util.ArrayList;
import java.util.List;

public class KafkaBeamSqlTest {

    private static DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        // create pipeline
        PipelineOptions kafkaOption = PipelineOptionsFactory.fromArgs(args)
                .withoutStrictParsing()
                .as(PipelineOptions.class);
        Pipeline pipeline = Pipeline.create(kafkaOption);

        // create kafka IO
        KafkaIO.Read<String, String> kafkaRead =
                KafkaIO.<String, String>read()
                        .withBootstrapServers("127.0.0.1:9092")
                        .withTopic("beamKafkaTest")
                        .withConsumerConfigUpdates(ImmutableMap.of("group.id", "client-1"))
                        .withReadCommitted()
                        .withKeyDeserializer(StringDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class)
                        .commitOffsetsInFinalize();

        // read from kafka
        PCollection<KV<String, String>> messages = pipeline.apply(kafkaRead.withoutMetadata());

        // build input schema
        Schema inputSchema = Schema.builder()
                .addStringField("word")
                .addDateTimeField("event_time")
                .addInt32Field("cnt")
                .build();

        // convert kafka message to Row
        PCollection<Row> rows = messages.apply(ParDo.of(new DoFn<KV<String, String>, Row>(){
            @ProcessElement
            public void processElement(ProcessContext c) {
                String jsonData = c.element().getValue();

                // parse json
                JSONObject jsonObject = JSON.parseObject(jsonData);

                // build row
                List<Object> list = new ArrayList<>();
                list.add(jsonObject.get("word"));
                list.add(dtf.parseDateTime((String) jsonObject.get("event_time")));
                list.add(jsonObject.get("cnt"));
                Row row = Row.withSchema(inputSchema)
                        .addValues(list)
                        .build();

                System.out.println(row);

                // emit row
                c.output(row);
            }
        }))
                .setRowSchema(inputSchema);

        // sql query
        PCollection<Row> result = PCollectionTuple.of(new TupleTag<>("t_count_stats"), rows)
                .apply(
                        SqlTransform.query(
                                "SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start\n" +
                                "FROM t_count_stats\n" +
                                "GROUP BY word, TUMBLE(event_time, INTERVAL '1' MINUTE)"
                        )
                );

        // sink results back to another kafka topic
        result.apply(MapElements.via(new SimpleFunction<Row, KV<String, String>>() {
            @Override
            public KV<String, String> apply(Row input) {

                System.out.println("result: " + input.getValues());

                return KV.of(input.getValue("word"), "result=" + input.getValues());
            }
        }))
                .apply(KafkaIO.<String, String>write()
                        .withBootstrapServers("127.0.0.1:9092")
                        .withTopic("beamPrint")
                        .withKeySerializer(StringSerializer.class)
                        .withValueSerializer(StringSerializer.class));

        // run
        pipeline.run();
    }
}

我的问题是,当我运行此代码并将一些消息输入 Kafka 时,没有抛出异常并且它已经从 Kafka 接收到一些消息,但我看不到它触发了窗口聚合的过程。并且没有按预期出现结果(就像我之前显示的表格一样)。

那么 Beam SQL 目前是否支持无界 Kafka 输入源上的窗口语法?如果是这样,我当前的代码有什么问题?我该如何调试和修复它?是否有任何将 Beam SQL 与 KafkaIO 集成的代码示例?

请帮我!非常感谢!!

4

1 回答 1