假设我有这份三叉戟工作:
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(
MemcachedState.opaque(serverLocations),
new Count(),
new Fields("count")
)
我如何在 Spark Streaming 中实现同样的目标?我看了看,updateStateByKey
但这似乎将状态保持在内部(而不是将其保持在像 Memcached 这样的外部状态)并且无限期地保持。当我保存它时,它看起来也试图转储每个批次上的所有内容,例如saveAsTextFile
,而不是仅发出在该批次上更新的键值。
我知道我可以简单地与外部状态交互foreachRDD
,但在这种情况下,我如何确保我只处理一次记录?