1

我正在编写一个使用 Node 和kafkajs https://kafka.js.org/docs/sumption实现 2 个消费者的服务

我有 2 个异步函数,它们已经初始化了消费者,理论上正在等待读取消息。像这样的东西:

const { Kafka } = require('kafkajs')

// Create the client with the broker list
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });

const consumerModule = async (): Promise<void> => {

 await consumer.connect();
 await consumer.subscribe({ topic: 'topic-A' });

 await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            key: message.key.toString(),
            value: message.value.toString(),
            headers: message.headers,
        })
    },
 });
}

export default consumerModule;

如果我希望消费者在服务中后台运行,我应该在初始化服务器时只导入模块文件吗?

我有一种感觉,这是行不通的,因为consumerModule必须调用函数并且因为它是async,不确定如何正确实现它。

例如:

import express, { Express } from 'express';
import consumerModuleA from './modules/consumerModuleA';
import consumerModuleB from './modules/consumerModuleB';

async function bootstrap(): Promise<Express> {

    const app = express();

    app.get('/health', (_req, res) => {
        res.send('ok');
    });

    return app;
}

请指教。

4

1 回答 1

1

我建议以下。笔记:

  1. 您可以通过参数化主题/组/等来重用消费者的实现。
  2. 显式调用消费者模块来运行它。
import express, { Express } from 'express';
import consumerModule from './modules/consumerModule';

consumerModule();

async function bootstrap(): Promise<Express> {

    const app = express();

    app.get('/health', (_req, res) => {
        res.send('ok');
    });

    return app;
}
于 2022-02-09T08:49:34.520 回答