我正在使用 Flink 来计算一系列操作。每个操作都会生成一个表,该表既用于下一个操作,也存储在 S3 中。这使得可以查看计算中每个中间步骤的数据并查看每个操作的效果。
我需要为每个表中的每一行分配一个唯一标识符,以便当该标识符在以下步骤中再次出现(可能在不同的列中)时,我知道两行相互关联。
第一个明显的候选似乎是ROW_NUMBER()
函数,但是:
它似乎不在表表达式 API 的任何地方。我必须构造 SQL 字符串吗?
我该如何使用它?当我尝试这个查询时:
SELECT *, ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp
我收到此错误:
org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
它总是需要对表格进行排序吗?这似乎是我宁愿避免的开销。
下一个选项只是为每一行生成一个随机 UUID。但是当我尝试这个时,相同的 UUID 永远不会被使用两次,所以它完全没用。这是一个例子:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Sandbox {
def main(args: Array[String]): Unit = {
val env = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment
)
val inp = env.fromValues(1.as("id"))
val out1 = inp.addColumns(uuid().as("u"))
val out2 = out1.addColumns($"u".as("u2"))
env.executeSql("""
CREATE TABLE out1 ( id INTEGER, u VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.executeSql("""
CREATE TABLE out2 ( id INTEGER, u VARCHAR(36), u2 VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.createStatementSet()
.addInsert("out1", out1)
.addInsert("out2", out2)
.execute()
// Equivalent to the createStatementSet method:
out1.executeInsert("out1")
out2.executeInsert("out2")
}
}
我得到的输出:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,55da264d-1e15-4c40-94d4-822e1cd5db9c,c9a78f93-580c-456d-9883-08bc998124ed)
我需要 UUIDout1
重新出现out2
在两列中,例如:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d,4e6008ad-868a-4f95-88b0-38ee7969067d)
我想这是由于文档中的这个注释:
此函数不是确定性的,这意味着将为每条记录重新计算该值。
如何只计算一次 UUID 并使其“具体”,以便将相同的值发送到out1
和out2
?
我使用用户定义的函数得到了类似的结果:
class uuidUdf extends ScalarFunction {
def eval(): String = UUID.randomUUID().toString
}
val out1 = inp.addColumns(call(new uuidUdf()).as("u"))