这是我使用 flink sql API 连接两个表的代码
tEnv.createTemporaryView("A", streamA,"speed_sum,cnt,window_start_time,window_end_time");
tEnv.createTemporaryView("B",streamB,"speed_sum,cnt,window_start_time,window_end_time");
String execSQL1 = "select A.speed_sum+COALESCE(B.speed_sum,0.0), " +
"A.cnt+COALESCE(B.cnt,0), " +
"A.window_start_time, A.window_end_time " +
"from A " +
"left join B on A.window_start_time = B.window_start_time ";
Table table = tEnv.sqlQuery(execSQL1);
DataStream<Tuple2<Boolean, Row>> streamResult = tEnv.toRetractStream(table, Row.class).;
streamResult.print("streamResult");
我的输出是这样的:
streamA-----------(5078.000000,199,1635333650000,1635333660000)
streamB-----------(1721.388891,111,1635333650000,1635333660000)
streamResult:3> (true,5078.0,199,1635333650000,1635333660000) // drop
streamResult:3> (false,5078.0,199,1635333650000,1635333660000) // drop
streamResult:3> (true,6799.388891220093,310,1635333650000,1635333660000) // want to save
如您所见,toRetractStream
API 将生成三条记录。我想知道如何获得最后一条记录,它正确地加起来了A.speed_sum
和B.speed_sum
(A.cnt
和B.cnt
)。