0

我将多张表的数据输入kafka,beam拿到数据后会执行SQL,但是现在出现如下错误:

线程“主”中的异常

java.lang.IllegalStateException:当 org.apache.beam.sdk.extensions.sql.impl 的 org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328) 没有架构时,无法调用 getSchema。 schema.BeamPCollectionTable.(BeamPCollectionTable.java:34) at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:141) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand (SqlTransform.java:102) 在 org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:82) 在 org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) 在org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473) 在 org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:248) 在 BeamSqlTest.main(BeamSqlTest.java:65)

有可行的解决方案吗?请帮我!

4

2 回答 2

1

I think you need to set schema for your input collection PCollection<Row> apply with setRowSchema() or setSchema(). The problem is that your schema is dynamic and it's defined in runtime (not sure if Beam supports this). Could you have static schema and define it before starting processing input data?

Also, since your input source is unbounded, you need to define windows to apply SqlTransform after.

于 2019-11-22T18:00:06.973 回答
0
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.repackaged.sql.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.List;

class BeamSqlTest {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
        options.setRunner(DirectRunner.class);
        Pipeline p = Pipeline.create(options);

        PCollection<KafkaRecord<String, String>> lines = p.apply(KafkaIO.<String, String>read()
                .withBootstrapServers("192.168.8.16")
                .withTopic("tmp_table.reuslt")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withConsumerConfigUpdates(ImmutableMap.of("group.id", "beam_app"))
                .withReadCommitted()
                .commitOffsetsInFinalize());

        PCollection<Row> apply = lines.apply(ParDo.of(new DoFn<KafkaRecord<String, String>,Row>(){
            @ProcessElement
            public void processElement(ProcessContext c) {
                String jsonData = c.element().getKV().getValue(); //data: {id:0001@int,name:test01@string,age:29@int,score:99@int}
                if(!"data_increment_heartbeat".equals(jsonData)){ //Filter out heartbeat information
                    JSONObject jsonObject = JSON.parseObject(jsonData);
                    Schema.Builder builder = Schema.builder();
                    //A data pipeline may have data from multiple tables so the Schema is obtained dynamically
                    //This assumes data from a single table
                    List<Object> list = new ArrayList<Object>();
                    for(String s : jsonObject.keySet()) {
                        String[] dataType = jsonObject.get(s).toString().split("@");   //data@field type
                        if(dataType[1].equals("int")){
                            builder.addInt32Field(s);
                        }else if(dataType[1].equals("string")){
                            builder.addStringField(s);
                        }
                        list.add(dataType[0]);
                    }
                    Schema schema = builder.build();
                    Row row = Row.withSchema(schema).addValues(list).build();
                    System.out.println(row);
                    c.output(row);
                }
            }
        }));

        PCollection<Row> result = PCollectionTuple.of(new TupleTag<>("USER_TABLE"), apply)
                .apply(SqlTransform.query("SELECT COUNT(id) total_count, SUM(score) total_score FROM USER_TABLE GROUP BY id"));

        result.apply( "log_result", MapElements.via( new SimpleFunction<Row, Row>() {
            @Override
            public Row apply(Row input) {
                System.out.println("USER_TABLE result: " + input.getValues());
                return input;
            }
        }));`enter code here`

    }
}
于 2019-11-22T10:18:49.357 回答