为了在 tridentstorm 中应用一些函数,我们将新创建的实例传递each
给在流上调用的方法,如下所示:
stream.each(inputFields, new SomeFunc(), outputFields)
其中SomeFunc
是 BaseFunc 的后代。
假设我想有一些状态变量SomeFunc
:
class SomeFunc extends BaseFunction {
var someState: String = _
override def execute(tuple: TridentTuple, collector: TridentCollector) = ???
}
如果我将 SomeFunc 组件的并行提示设置为大于 1 的某个值,将会创建多个SomeFunc
? 在 SomeFunc 中访问/更新 someState 是线程安全操作吗?如果不是将 SomeClass 定义为类,而是将其定义为一个对象,那会改变吗?
编辑 好的,在用户@Shaw 对他的回答的评论的帮助下,我了解到storm为每个执行者创建了一个storm组件实例(storm/bolt/function/aggregator等)。问题是它是如何做到这一点的?我想知道这种行为的机制