我有一些使用 kafka 作为源和接收器的 flink 作业,我想向它添加跟踪,以便从 Kafka 消费/产生的任何消息都可以很好地跟踪,因为我正在使用 kafka 拦截器来拦截消息并记录trace、span 和 parent traceId,因为我将 opentracing-kafka-client(v0.1.11) 与 brave-opentracing(v0.35.1) 结合使用,这是我使用自定义拦截器的原因,因为我需要记录消息以指定的格式。
配置拦截器后,它们被调用,它使用来自上游系统的跟踪信息(来自标头)并记录它,但是当再次向 kafka 生成消息时,跟踪上下文丢失,例如考虑以下场景
1) 一些 rest 服务放在 Kafka 上的消息 2) flink 作业和拦截器消耗的消息启动并使用 header 中的跟踪信息并记录它 3) 处理后由 flink 作业产生到 Kafka 的消息
它在第 2 步之前运行良好,但是在生成消息时,不使用上一步的跟踪信息,因为它没有任何标头信息,因此它会产生全新的跟踪。
我正在注册跟踪器,如下所示:-
public class MyTracer {
private static final Tracer INSTANCE = BraveTracer.create(Tracing.newBuilder().build());
public static void registerTracer() {
GlobalTracer.registerIfAbsent(INSTANCE);
}
public static Tracer getTracer() {
return INSTANCE;
}
}
我正在使用TracingConsumerInterceptor
和TracingProducerInterceptor
来自opentracing kafka。