2

假设我有这份三叉戟工作:

TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(
            MemcachedState.opaque(serverLocations),
            new Count(),
            new Fields("count")
        )                

我如何在 Spark Streaming 中实现同样的目标?我看了看,updateStateByKey但这似乎将状态保持在内部(而不是将其保持在像 Memcached 这样的外部状态)并且无限期地保持。当我保存它时,它看起来也试图转储每个批次上的所有内容,例如saveAsTextFile,而不是仅发出在该批次上更新的键值。

我知道我可以简单地与外部状态交互foreachRDD,但在这种情况下,我如何确保我只处理一次记录?

4

1 回答 1

2

经过一番研究,这是我发现的:

  • Spark Streaming 仅支持输出操作的 at-least-once 语义,因此它无法提供 Trident 的 Exactly-once 语义(至少无需编写自己的代码)。
  • updateStateByKey 可以提供完全一次的语义,但这依赖于它的输出完全替换了以前的输出(它在每个检查点上发出整个状态)。这使得它无法用于任何不重要的状态。此外,没有办法用现有数据初始化状态。如果您重新启动工作,状态将重置(至少这是我的理解)。有计划在 1.3.0 中添加此功能。

总之,如果您想使用 Trident 进行事务更新似乎是安全的选择。

于 2015-01-10T12:24:06.877 回答