0

如何在物化视图中访问自定义标题?我正在尝试在我的应用程序中构建一些自定义 dlq 逻辑,并希望构建基于标头信息的重试机制。实际重试由调度程序触发,该调度程序应在物化视图中查找这些标头信息。

以下是一些代码片段:

创建物化视图:

@Slf4j
@EnableBinding(DlqBinding.class)
public class DlqRetryService {

    @StreamListener
    public void readTable(@Input(DlqBinding.DLQ_TOPIC) KTable<String, String> table) {
    }
}

public interface DlqBinding {

    String DLQ_TOPIC = "dlq";

    @Input(DLQ_TOPIC)
    KTable<?, ?> dlqInput();
}

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
            configuration:
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          bindings:
            dlq:
              consumer:
                materializedAs: currentDL
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde

调度器:

    public void processDL() {
        ReadOnlyKeyValueStore<Object, Object> currentDL = interactiveQueryService.getQueryableStore("currentDL", QueryableStoreTypes.keyValueStore());

        KeyValueIterator<Object, Object> all = currentDL.all();

        while (all.hasNext()) {
            KeyValue<Object, Object> next = all.next();
            log.info("Found Entry in currentDL: {}", next);
            // some retry logic would be here
        }
    }```
4

1 回答 1

0

我认为不可能使用交互式查询从物化视图中以这种方式访问​​标题。

它不清楚你在重试什么。您是否尝试重新处理记录?KStream您可以使用 a和转换器/处理器 API封装这种逻辑。这是这种模式的蓝图。请记住,这是访问标头的一般方法。您的应用程序可能需要根据您的特定用例进行调整。

@StreamListener
public void processStream(@Input(DlqBinding.DLQ_TOPIC) KStream<String, String> stream) {

stream.process(() -> new Processor() {

            ProcessorContext context;

            @Override
            public void init(ProcessorContext context) {
                this.context = context;
            }

            @Override
            public void process(Object key, Object value) {
                final Headers headers = this.context.headers();
                final Iterable<Header> headerXyz = headers
                        .headers("HEADER_KEY");
                // iterate on the header information returned and perform 
                // your application specific logic here.
            }

            @Override
            public void close() {

            }
        });

}
于 2020-03-30T21:45:17.163 回答