我们目前在集群拓扑模式下使用 Apache Storm 0.9.5 来处理 Amazon Kinesis 记录 (spout) 并将它们存储到 Redshift 数据仓库 (bolt) 中。我们的 Storm 集群部署在 AWS 中,由 1 个 nimbus + UI 节点、1 个 zookeeper 节点和 3 个 supervisor + logviewer 节点组成。我们的拓扑配置支持处理多个 Kinesis 流,并且它包含的每个流:
- 一个 Kinesis 流喷口用于侦听传入记录
- 一个 Redshift 螺栓,用于将记录插入数据仓库
拓扑:
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// for every configured kinesis stream
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts();
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) {
final String spoutId = kinesisStreamSpout.getSpoutId();
topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout());
// set the corresponding redshift bolt
final String streamName = kinesisStreamSpout.getStreamName();
final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName);
topologyBuilder.setBolt(redshiftBolt.getId(),
redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId);
}
return topologyBuilder.createTopology();
该系统的一个问题是它无法保证只处理一次输入消息,从而导致将具有相同业务密钥的多条记录插入到目标数据库中。为了了解问题的严重程度,我们进行了一项受控测试,发现大约三分之一的输入记录被多次提交处理。
根据这个线程(目前尚未得到答复),我们也考虑过使用 Trident 来保证一次性处理,但也得出结论,将幂等性内置到系统中更为重要(以及至少-once 语义)而不是像其他文章所建议的那样增加复杂性、降低性能和生成状态。
我们现在正在寻求以支持集群的方式在现有拓扑中实现幂等性的最佳方式的建议。到目前为止,我们倾向于引入一个 RedisBolt,它可以通过元组消息 id 键值。是否存在使用 Apache Storm 实现此目的的现有模式?