我正在尝试创建一个应用程序(在 nodejs 中)以将 kafka 事件流式传输到套接字连接。我使用的库是:kafkajs 和 socket.io
我可以通过以下方式对一个套接字连接执行相同的操作:
const activeUsers = new Set();
const kafka = new Kafka({
clientId: 'BA-Webapp',
brokers: ['XX.XX.XX.XX:9092']
})
const attentionConsumer = kafka.consumer({ groupId: 'XXX' })
// App setup
const PORT = 5000;
const app = express();
const server = app.listen(PORT, function () {
console.log(`Listening on port ${PORT}`);
console.log(`http://localhost:${PORT}`);
});
// Static files
app.use(express.static("public"));
// Socket setup
const io = socket(server);
io.on("connection", async function (socket) {
console.log("Client connected")
socket.on('start-stream', async clientUserId => {
console.log("Client message received")
socket.userId = clientUserId;
activeUsers.add(clientUserId);
console.log(activeUsers)
await attentionConsumer.connect()
await attentionConsumer.subscribe({ topic: 'on_message', fromBeginning: false })
await attentionConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
io.send(message.value.toString(), clientUserId)
},
})
console.log(`Received message => ${message}`)
})
});
我上面所做的是每次客户端连接并发送消息启动流时,我都会创建一个消费者连接,将其订阅到一个主题,并且每次主题收到一条消息时,我都会将其发送回客户端。
这适用于一个客户端,一旦第二个客户端连接,我就会得到预期的消费者错误已经在运行。
如何在套接字范围之外启动消费者(以便它在应用程序加载时启动)并在套接字内部接收任何消息流时(仅当有可用的套接字连接时)?
我对此很陌生,任何帮助将不胜感激。