我们正在使用 micronaut/kafka-streams。使用此框架来创建流应用程序,您可以构建如下内容:
@Factory
public class FooTopologyConfig {
@Singleton
@Named
public KStream<String, FooPojo> configureTopology {
return builder.stream("foo-topic-in")
.peek((k,v) -> System.out.println(String.format("key %s, value: %s", k,v))
.to("foo-topic-out");
}
}
这:
- 收到一个
ConfiguredStreamBuilder
(非常轻的包装纸StreamsBuilder
) - 构建并返回流(我们实际上不确定返回流的重要性,但这是一个不同的问题)。
ConfiguredStreamBuilder::build()
(在 上调用相同的StreamsBuilder
方法)稍后由框架调用,并且返回Topology
的内容不可用于 Micronaut 的注入。
我们需要Topology
bean 来记录拓扑的描述(通过Topology::describe
)。
执行以下操作是否安全?
- 调用
ConfiguredStreamBuilder::build
(因此StreamsBuilder::build
)并使用返回的实例Topology
来打印人类可读的描述。 - 允许框架
ConfiguredStreamBuilder::build
稍后再次调用,并使用返回的拓扑的第二个实例来构建应用程序。