我对使用 C++ 的 rdkafkacpp.h Kafka 库相当陌生。
我参考了一些可用的在线资源,关于如何在 Windows 上设置 Kafka 消费者,使用提供的标准 Kafka 环境设置 .bat 文件:https : //kafka.apache.org/quickstart能够测试从生产者终端发送的消息是否出现在消费者终端上)
我已经阅读了 Kafka 的基本理论并浏览了以下文档:https ://docs.confluent.io/4.0.0/clients/librdkafka
在阅读完这些基本内容后,我尝试使用内置库函数编写示例代码,如下所示。但是,我在尝试设置“conf”的引导服务器属性的代码行收到运行时异常。该异常是访问冲突异常。“在 mykafka.exe 中的 0x00007FFB878EC3F9 (msvcr120.dll) 处引发异常:0xC0000005:访问冲突读取位置 0x0000008E5A900000。”
我怀疑使用库函数的 Kafka 消费者“实现”的顺序错过了一些步骤,或者必须重新排序。
我试图保持实现简单,我的本地机器上只有一个生产者(localhost:9092),只有这个消费者(mykafka.exe)。此外,一个主题“快速启动事件”已在终端上启动。
任何帮助是极大的赞赏!
PS:Visual Studio 2019 用于此代码开发。
#include <iostream>
#include "..\include\librdkafka\rdkafkacpp.h"
#include <process.h> // to use exit()
using namespace std;
using namespace RdKafka;
int configAsKafkaConsumer()
{
string host = "localhost:9092";
string errstr;
/* Set properties */
cout << "Inside configAsKafkaConsumer()" << endl;
// Create configuration objects
RdKafka::Conf* conf = NULL;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if (conf != NULL)
{
cout << "conf != NULL" << endl;
}
// THIS IS WHERE I'M GETTING THE RUNTIME EXCEPTION!!
if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK)
{
cerr << "Failed to set config of broker: " << errstr << endl;
exit(1);
}
if (conf->set("client.id", host, errstr) != RdKafka::Conf::CONF_OK)
{
cout << "client.id:" << endl;
exit(1);
}
if (conf->set("group.id", "foo", errstr) != RdKafka::Conf::CONF_OK)
{
cout << "group.id:" << endl;
exit(1);
}
// Create a consumer handle
Consumer* ConsumerHandle = RdKafka::Consumer::create(conf, errstr);
string errstr;
string prodTopic = "quickstart-events";
cout << "Creating topic handle" << endl;
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// Create topic handle.
RdKafka::Topic* topic = RdKafka::Topic::create(ConsumerHandle, prodTopic,
tconf, errstr);
if (!topic)
{
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
cout << "Starting the consumer handle" << endl;
ConsumerHandle->start(topic, 1, 0);
cout << "Consuming the message" << endl;
Message* msg = ConsumerHandle->consume(topic, 1, 10000);
cout << "Message is: " << msg->payload() << endl;
}