0

为了在 tridentstorm 中应用一些函数,我们将新创建的实例传递each给在流上调用的方法,如下所示:

stream.each(inputFields, new SomeFunc(), outputFields)

其中SomeFunc是 BaseFunc 的后代。

假设我想有一些状态变量SomeFunc

class SomeFunc extends BaseFunction {

  var someState: String = _

  override def execute(tuple: TridentTuple, collector: TridentCollector) = ???
}

如果我将 SomeFunc 组件的并行提示设置为大于 1 的某个值,将会创建多个SomeFunc? 在 SomeFunc 中访问/更新 someState 是线程安全操作吗?如果不是将 SomeClass 定义为类,而是将其定义为一个对象,那会改变吗?

编辑 好的,在用户@Shaw 对他的回答的评论的帮助下,我了解到storm为每个执行者创建了一个storm组件实例(storm/bolt/function/aggregator等)。问题是它是如何做到这一点的?我想知道这种行为的机制

4

1 回答 1

1

我不知道 Trident 究竟是如何工作的,但是在 Storm 中,如果您定义并行提示 > 1 ,您将为该组件创建多个执行程序,这些执行程序是由工作进程产生的线程。

该执行程序将创建 SomeFunc 的 X(任务数,默认为 1)“实例”,并且不会在它们之间共享变量 someState。暴风雨工作时 someState是线程安全的,因为它们在到达组件时“按顺序在自己的线程中”执行元组。

我几乎可以肯定在 Trident 中是一样的,因为它只是 Storm 上的微批处理抽象。

当然你已经阅读过它,但如果没有,我强烈建议你阅读这篇关于 Storm 并行性的精彩文章。

于 2014-09-20T09:31:10.387 回答