0

我正在尝试创建一个应用程序(在 nodejs 中)以将 kafka 事件流式传输到套接字连接。我使用的库是:kafkajs 和 socket.io

我可以通过以下方式对一个套接字连接执行相同的操作:

const activeUsers = new Set();
const kafka = new Kafka({
    clientId: 'BA-Webapp',
    brokers: ['XX.XX.XX.XX:9092']
})


const attentionConsumer = kafka.consumer({ groupId: 'XXX' })


// App setup
const PORT = 5000;
const app = express();
const server = app.listen(PORT, function () {
    console.log(`Listening on port ${PORT}`);
    console.log(`http://localhost:${PORT}`);
});

// Static files
app.use(express.static("public"));

// Socket setup
const io = socket(server);

io.on("connection", async function (socket) {

    console.log("Client connected")


    socket.on('start-stream', async clientUserId => {
        console.log("Client message received")
        socket.userId = clientUserId;
        activeUsers.add(clientUserId);
        console.log(activeUsers)
        await attentionConsumer.connect()
        await attentionConsumer.subscribe({ topic: 'on_message', fromBeginning: false })

        await attentionConsumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                
                io.send(message.value.toString(), clientUserId)
            },
        })
        console.log(`Received message => ${message}`)
    })
});

我上面所做的是每次客户端连接并发送消息启动流时,我都会创建一个消费者连接,将其订阅到一个主题,并且每次主题收到一条消息时,我都会将其发送回客户端。

这适用于一个客户端,一旦第二个客户端连接,我就会得到预期的消费者错误已经在运行。

如何在套接字范围之外启动消费者(以便它在应用程序加载时启动)并在套接字内部接收任何消息流时(仅当有可用的套接字连接时)?

我对此很陌生,任何帮助将不胜感激。

4

0 回答 0