如何在物化视图中访问自定义标题?我正在尝试在我的应用程序中构建一些自定义 dlq 逻辑,并希望构建基于标头信息的重试机制。实际重试由调度程序触发,该调度程序应在物化视图中查找这些标头信息。
以下是一些代码片段:
创建物化视图:
@Slf4j
@EnableBinding(DlqBinding.class)
public class DlqRetryService {
@StreamListener
public void readTable(@Input(DlqBinding.DLQ_TOPIC) KTable<String, String> table) {
}
}
public interface DlqBinding {
String DLQ_TOPIC = "dlq";
@Input(DLQ_TOPIC)
KTable<?, ?> dlqInput();
}
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
dlq:
consumer:
materializedAs: currentDL
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
调度器:
public void processDL() {
ReadOnlyKeyValueStore<Object, Object> currentDL = interactiveQueryService.getQueryableStore("currentDL", QueryableStoreTypes.keyValueStore());
KeyValueIterator<Object, Object> all = currentDL.all();
while (all.hasNext()) {
KeyValue<Object, Object> next = all.next();
log.info("Found Entry in currentDL: {}", next);
// some retry logic would be here
}
}```