无法连接到 AWS MSK 并为客户端代理通信 kafkajs 启用了 sasl_scram 和 TLS 加密
出现错误
{"level":"ERROR","timestamp":"2021-06-28T06:17:28.078Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"XXXXX1:9096","clientId":"XXX-local-producer"}
{"level":"ERROR","timestamp":"2021-06-28T06:17:28.079Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":4,"retryTime":4910}
{"level":"ERROR","timestamp":"2021-06-28T06:17:43.005Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"XXX:9096","clientId":"XXX-local-producer"}
{"level":"ERROR","timestamp":"2021-06-28T06:17:43.005Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":5,"retryTime":9790}
could not write message KafkaJSNumberOfRetriesExceeded: Connection timeout
{"level":"ERROR","timestamp":"2021-06-28T06:18:42.699Z","logger":"kafkajs","message":"[Connection] Connection error: connect ETIMEDOUT 10.201.21.127:9096","broker":"XXXXXX:9096","clientId":"XXXX-local-producer","stack":"Error: connect ETIMEDOUT 10.201.21.127:9096\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1133:16)"}
{"level":"ERROR","timestamp":"2021-06-28T06:18:52.937Z","logger":"kafkajs","message":"[Connection] Connection error: connect ETIMEDOUT 10.201.22.64:9096","broker":"XXXXXX:9096","clientId":"XXXX-local-producer","stack":"Error: connect ETIMEDOUT 10.201.22.64:9096\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1133:16)"}
{"level":"ERROR","timestamp":"2021-06-28T06:19:05.225Z","logger":"kafkajs","message":"[Connection] Connection error: connect ETIMEDOUT 10.201.21.127:9096","broker":"XXXXXX:9096","clientId":"XXX-local-producer","stack":"Error: connect ETIMEDOUT 10.201.21.127:9096\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1133:16)"}
{"level":"ERROR","timestamp":"2021-06-28T06:19:15.465Z","logger":"kafkajs","message":"[Connection] Connection error: connect ETIMEDOUT 10.201.22.64:9096","broker":"XXXXX:9096","clientId":"XXXX-local-producer","stack":"Error: connect ETIMEDOUT 10.201.22.64:9096\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1133:16)"}
{"level":"ERROR","timestamp":"2021-06-28T06:19:27.753Z","logger":"kafkajs","message":"[Connection] Connection error: connect ETIMEDOUT 10.201.21.127:9096","broker":"bXXXXXX:9096","clientId":"XXX-local-producer","stack":"Error: connect ETIMEDOUT 10.201.21.127:9096\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1133:16)"}
const { Kafka, logLevel } = require('kafkajs')
const fs = require('fs')
const kafka = new Kafka({
clientId: 'local-producer',
brokers: ['XXXXXX1:9096', 'XXXXXX2:9096'],
connectionTimeout: 10000,
authenticationTimeout: 10000,
requestTimeout: 10000,
logLevel: logLevel.DEBUG,
ssl: {
rejectUnauthorized: false,
},
sasl: {
mechanism: 'scram-sha-512',
username: 'msk_user',
password: 'msk_pass'
},
})
const producer = kafka.producer()
console.log('about to hit producer code');
var sendMessage = async () => {
try {
// Producing
console.log('about to hit producer code inside try');
await producer.connect()
console.log('connected producer code');
await producer.send({
topic: 'AWSKafkaPocTopic',
messages: [
{ value: "hi hello EC2 js"},
],
})
await producer.disconnect()
}
catch(err)
{
console.error("could not write message " + err)
}
}
sendMessage();
console.log('about to hit producer code inside END');