我正在编写一个使用 Node 和kafkajs
https://kafka.js.org/docs/sumption实现 2 个消费者的服务
我有 2 个异步函数,它们已经初始化了消费者,理论上正在等待读取消息。像这样的东西:
const { Kafka } = require('kafkajs')
// Create the client with the broker list
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });
const consumerModule = async (): Promise<void> => {
await consumer.connect();
await consumer.subscribe({ topic: 'topic-A' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
});
}
export default consumerModule;
如果我希望消费者在服务中后台运行,我应该在初始化服务器时只导入模块文件吗?
我有一种感觉,这是行不通的,因为consumerModule
必须调用函数并且因为它是async
,不确定如何正确实现它。
例如:
import express, { Express } from 'express';
import consumerModuleA from './modules/consumerModuleA';
import consumerModuleB from './modules/consumerModuleB';
async function bootstrap(): Promise<Express> {
const app = express();
app.get('/health', (_req, res) => {
res.send('ok');
});
return app;
}
请指教。