1

我们目前在集群拓扑模式下使用 Apache Storm 0.9.5 来处理 Amazon Kinesis 记录 (spout) 并将它们存储到 Redshift 数据仓库 (bolt) 中。我们的 Storm 集群部署在 AWS 中,由 1 个 nimbus + UI 节点、1 个 zookeeper 节点和 3 个 supervisor + logviewer 节点组成。我们的拓扑配置支持处理多个 Kinesis 流,并且它包含的每个流:

  • 一个 Kinesis 流喷口用于侦听传入记录
  • 一个 Redshift 螺栓,用于将记录插入数据仓库

拓扑:

final TopologyBuilder topologyBuilder = new TopologyBuilder();

// for every configured kinesis stream
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts();
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) {
    final String spoutId = kinesisStreamSpout.getSpoutId();
    topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout());

    // set the corresponding redshift bolt
    final String streamName = kinesisStreamSpout.getStreamName();
    final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName);
    topologyBuilder.setBolt(redshiftBolt.getId(),
        redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId);
}

return topologyBuilder.createTopology();

该系统的一个问题是它无法保证只处理一次输入消息,从而导致将具有相同业务密钥的多条记录插入到目标数据库中。为了了解问题的严重程度,我们进行了一项受控测试,发现大约三分之一的输入记录被多次提交处理。

根据这个线程(目前尚未得到答复),我们也考虑过使用 Trident 来保证一次性处理,但也得出结论,将幂等性内置到系统中更为重要(以及至少-once 语义)而不是像其他文章所建议的那样增加复杂性、降低性能和生成状态。

我们现在正在寻求以支持集群的方式在现有拓扑中实现幂等性的最佳方式的建议。到目前为止,我们倾向于引入一个 RedisBolt,它可以通过元组消息 id 键值。是否存在使用 Apache Storm 实现此目的的现有模式?

4

1 回答 1

0

如果您不想使用 Trident,您可能需要阅读以下有关“事务拓扑”的文章。这是 Trident 背后的概念,您仍然可以“手动”应用它。对于您的用例来说,这似乎是一个很好的模式:https ://storm.apache.org/documentation/Transactional-topologies.html

此外,我想补充一点,Storm(与 Apache Flink [免责声明:我是 Flink 的提交者] 和 Apache Spark Streaming 等任何其他系统一样)只能保证在系统进行完全一次处理。如果数据被转发到外部系统,只有当且仅当外部系统可以支持幂等操作时,才能实现精确一次。

于 2015-09-28T09:01:14.173 回答