0

我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入一个主题。如何在 Spring Cloud Streams Kafka 应用程序中完成?

@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->

    event.map{
        ...
    }.process(ProcessorSupplier {

            object : Processor<EventId, MappedEventValue> {

                private lateinit var store: KeyValueStore<EventId, MappedEventValue>

                override fun init(context: ProcessorContext) {
                    store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
                }

                override fun process(key: EventId, value: MappedEventValue) {
                    ...
                    store.put(key, processedMappedEventValue)

                    //TODO Write into a topic
                }
            }
    }
}  
4

1 回答 1

0

你不能。该process()方法是一种终端操作,不允许您向下游发出数据。相反,您可以使用transform()虽然(它基本上是相同的,process()但允许您向下游发出数据);或取决于您的应用程序transformValues()flatTransform()

使用transform()KStream回来,你可以写成一个主题。

于 2020-05-02T21:43:54.030 回答