0

My use-case is as follows: I have a kafka topic A with messages "logically" belonging to different "services", I don't handle neither the system sending the messages to A.

I want to read such messages from A and dispatch them to a per-service set of topics on the same cluster (let's call them A_1, ..., A_n), based on one column describing the service (the format is CSV-style, but it doesn't matter).

The set of services is static, I don't have to handle addition/removal at the moment.

I was hoping to use KafkaConnect to perform such task but, surprisingly, there are no Kafka source/sinks (I cannot find the tickets, but they have been rejected).

I have seen MirrorMaker2 but it looks like an overkill for my (simple) use-case.

I also know KafkaStreams but I'd rather not write and maintain code just for that.

My question is: is there a way to achieve this topic dispatching with kafka native tools without writing a kafka-consumer/producer myself?

PS: if anybody thinks that MirrorMaker2 could be a good fit I am interested too, I don't know the tool very well.

4

3 回答 3

1

Mirror Maker is for doing ... mirroring. It's useful when you want to mirror one cluster from one data center to the other with the same topics. Your use case is different.

Kafka Connect is for syncing different systems (data from Databases for example) through Kafka topics but I don't see it for this use case either.

I would use a Kafka Streams application for that.

于 2019-12-19T17:51:09.753 回答
1

As for my knowledge, there is no straightforward way to branch incoming topic messages to a list of topics based on the incoming messages. You need to write custom code to achieve this.

  1. Use Processor API Refer here
  2. Pass list of topics inside the Processor method
  3. Use logic to identify topics need to branch
  4. Use context.forward to publish a message to other topics

context.forward(key, value, To.child("selected topic"))

于 2019-12-19T18:11:21.903 回答
0

All the other answers are right, at the time of writing I did find any "config-only" solution in the Kafka toolset.

What finally did the trick was to use Logstash, as its "kafka output plugin" supports jinja variables in topic-id parameter.

So once you have the "target topic name" available in a field (say service_name) it's as simple as this:

output {
  kafka {
    id => "sink"
    codec => [...]
    bootstrap_servers => [...]
    topic_id => "%{[service_name]}"
    [...]
  }
}
于 2020-08-19T12:33:53.430 回答