在将 转换DataStream
为 a时Table
,我们有机会指定 aorg.apache.flink.table.api.Schema
来调整 java 类型和 SQL 类型之间的映射,以及声明像水印这样的元数据。
这个片段适用于我的情况:
import java.time.LocalDateTime;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<String, LocalDateTime>> dataStream = env.fromElements(
Tuple2.of("Alice", LocalDateTime.parse("2021-11-16T08:19:30.123")),
Tuple2.of("Bob", LocalDateTime.parse("2021-11-16T08:19:31.123")),
Tuple2.of("John", LocalDateTime.parse("2021-11-16T08:19:32.123")));
// note that "f0" and "f1" here come from the field names in Tuple2
Table inputTable = tableEnv.fromDataStream(dataStream,
Schema.newBuilder()
.column("f0", "STRING")
.column("f1", "TIMESTAMP(3)")
.watermark("f1", "SOURCE_WATERMARK()")
.build()
);
tableEnv.createTemporaryView("input_table", inputTable);
tableEnv.executeSql("DESCRIBE input_table").print();
tableEnv.executeSql("" +
" SELECT " +
" UPPER(f0) AS name, " +
" f1 AS datetime, " +
" date_format(f1, 'YYYY') AS event_year " +
" FROM input_table "
).print();
哪个打印:
+------+------------------------+------+-----+--------+--------------------+
| name | type | null | key | extras | watermark |
+------+------------------------+------+-----+--------+--------------------+
| f0 | STRING | true | | | |
| f1 | TIMESTAMP(3) *ROWTIME* | true | | | SOURCE_WATERMARK() |
+------+------------------------+------+-----+--------+--------------------+
和
+----+--------------------------------+-------------------------+--------------------------------+
| op | name | datetime | event_year |
+----+--------------------------------+-------------------------+--------------------------------+
| +I | ALICE | 2021-11-16 08:19:30.123 | 2021 |
| +I | BOB | 2021-11-16 08:19:31.123 | 2021 |
| +I | JOHN | 2021-11-16 08:19:32.123 | 2021 |
+----+--------------------------------+-------------------------+--------------------------------+
我发现DESCRIBE
调试这些案例非常方便。
另请参阅此处了解更多详细信息:https ://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
请注意,和之间的转换DataStream
在Table
1.13 中得到了改进,语法也发生了一些变化。这就是他们在该文档的“遗留”部分中所指的内容。您可能会在较早的 SO 帖子中偶然发现该遗留语法的示例。
检查此处描述的 java 类型和 SQL 类型之间的对应关系可能也很有帮助:https ://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type