我需要使用 .Net 从外部服务器读取 Kafka 消息。作为第一步,我在本地机器上安装了 Kafka,然后编写了 .Net 代码。它按要求工作。然后,我搬到了云端,但代码不起作用。这是我的设置。
我在 Azure 上的 Windows VM(VM1:10.0.0.4)上部署了 Kafka 服务器。它已启动并正在运行。我创建了一个测试主题并使用 cmd 生成了一些消息。为了测试一切正常,我用 cmd 打开了一个消费者并收到了生成的消息。
然后,我使用 Visual Studio 部署了另一个 Windows VM(VM2、10.0.0.5)。两个虚拟机都部署在同一个虚拟网络上,因此我不必担心打开端口或任何其他网络配置。
然后,我复制了我的 Visual Studio 项目代码,然后将引导服务器的 IP 地址更改为指向 Kafka 服务器。然后它不起作用,我读到我必须更改 Kafka 的服务器配置,所以我打开了 server.properties 并将 listeners 属性修改为listeners=PLAINTEXT://10.0.0.4:9092
. 它仍然不起作用。
我在网上搜索并尝试了许多技巧,但它不起作用。我认为首先要向外部服务器(vm1)提供凭证,可能还有其他一些配置。不幸的是, confluent的官方文档很短,示例很少。官方 GitHub上也没有我的案例。我玩过 Consumer Config 类中的“Sasl”属性,但也没有成功。
错误信息是:
%3|1622220986.498|失败|rdkafka#consumer-1| [thrd:10.0.0.4:9092/bootstrap]:10.0.0.4:9092/bootstrap:连接到 ipv4#10.0.0.4:9092 失败:未知错误(在 CONNECT 状态下 21038 毫秒后)错误:10.0.0.4:9092/bootstrap:连接到 ipv4#10.0.0.4:9092 失败:未知错误(在 CONNECT 状态下 21038 毫秒后)错误:1/1 代理已关闭
这是我的 .Net 核心代码:
static void Main(string[] args)
{
string topic = "AzureTopic";
var config = new ConsumerConfig
{
BootstrapServers = "10.0.0.4:9092",
GroupId = "test",
//SecurityProtocol = SecurityProtocol.SaslPlaintext,
//SaslMechanism = SaslMechanism.Plain,
//SaslUsername = "[User]",
//SaslPassword = "[Password]",
AutoOffsetReset = AutoOffsetReset.Latest,
//EnableAutoCommit = false
};
int x = 0;
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topic);
var cancelToken = new CancellationTokenSource();
while (true)
{
// some tasks
}
consumer.Close();