0

我们正在使用 micronaut/kafka-streams。使用此框架来创建流应用程序,您可以构建如下内容:

@Factory
public class FooTopologyConfig {
  @Singleton
  @Named
  public KStream<String, FooPojo> configureTopology {
    return builder.stream("foo-topic-in")
           .peek((k,v) -> System.out.println(String.format("key %s, value: %s", k,v))
           .to("foo-topic-out");
  }
}

这:

  • 收到一个ConfiguredStreamBuilder(非常轻的包装纸StreamsBuilder
  • 构建并返回流(我们实际上不确定返回流的重要性,但这是一个不同的问题)。

ConfiguredStreamBuilder::build()(在 上调用相同的StreamsBuilder方法)稍后由框架调用,并且返回Topology的内容不可用于 Micronaut 的注入。

我们需要Topologybean 来记录拓扑的描述(通过Topology::describe)。

执行以下操作是否安全?

  • 调用ConfiguredStreamBuilder::build(因此StreamsBuilder::build)并使用返回的实例Topology来打印人类可读的描述。
  • 允许框架ConfiguredStreamBuilder::build稍后再次调用,并使用返回的拓扑的第二个实例来构建应用程序。
4

1 回答 1

1

build()多次调用应该没有问题。这在 Streams 的内部代码和测试中很常见。

回答你的另一个问题。builder.stream()如果您想稍后扩展拓扑的该分支,则只需要来自操作的流。

于 2020-12-08T01:23:35.810 回答