我正在使用cppkafka 库,它是 librdkafka 的包装器,而 C++ Kafka 客户端则用于非常简单的消息流式传输任务。我的消费者类行为怪异,因为接收消息需要相当长的时间。更准确地说,每次接收可执行文件运行并保持运行时,消费者可以正确接收第一批消息,但后续消息大约需要 15 秒才能到达。任何人都知道什么可能性会导致这样的事情(kafka 配置、库特定问题或我的愚蠢错误)?一百万谢谢。
我的接收线程如下
configuration_.set("group.id", 0);
consumer_ = std::make_unique<cppkafka::Consumer>(configuration_);
consumer_->subscribe({TopicTraits<trade::OrderRequest>::topic, TopicTraits<trade::CancelRequest>::topic});
std::thread([this] {
while (working_) {
cppkafka::Message msg = consumer_->poll();
if (msg) {
if (msg.get_error()) {
if (!msg.is_eof()) {
ERROR("error occurred while polling message: {}", msg.get_error());
}
} else {
try {
Json j = Json::parse(msg.get_payload());
if (msg.get_topic() == TopicTraits<trade::OrderRequest>::topic) {
INFO("received [order_req], {}", msg.get_payload());
ReceiveOrderRequest(j.get<trade::OrderRequest>());
} else if (msg.get_topic() == TopicTraits<trade::CancelRequest>::topic) {
INFO("received [cancel_req], {}", msg.get_payload());
ReceiveCancelRequest(j.get<trade::CancelRequest>());
}
} catch (const std::exception &e) {
ERROR("error occurred while handling incoming message, {}", e.what());
}
}
}
}
}).detach();