我想使用 Kafka 消息,因此我emit()
首先调用了该消息,该消息成功地将消息存储在 Kafka 中。
我可以通过运行批处理文件看到:bin/windows/kafka-console-consumer.bat --topic signals --from-beginning --bootstrap-server localhost:9092
并记录新消息。{"botId":"TKS","emailToken":"38fhsf29h","pair":"xy","size":100}
这是我的 nodejs/nestjs 服务文件。我的脚本中缺少什么 - 因为同一个脚本成功写入 Kafka?我已经尝试过@MessagePattern
并@EventPattern
使用该消息。有任何想法吗?
import { Injectable, OnModuleInit } from '@nestjs/common';
import { TVSignal } from './tvsignal';
import { Client, ClientKafka, EventPattern, MessagePattern, Payload, Transport } from "@nestjs/microservices";
import { Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
@Injectable()
export class AppService implements OnModuleInit {
constructor(private eventEmitter: EventEmitter2) {}
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'trading-signal-receiver',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'signals',
allowAutoTopicCreation: true
},
producer: {
}
}
})
kafkaClient: ClientKafka;
async onModuleInit() {
this.kafkaClient.subscribeToResponseOf('signals');
await this.kafkaClient.connect();
Logger.log("consumer assignments: " + JSON.stringify(this.kafkaClient.getConsumerAssignments()))
}
// DOES NOT WORK
@EventPattern('signals')
async handleEntityCreated(payload: any) {
Logger.log("RECEIVED NEW: "+ JSON.stringify(payload));
}
getTradingSignals(): string {
return ""
}
// DOES NOT WORK
@MessagePattern('signals') // Our topic name
getMessage(@Payload() message) {
Logger.log("RECEIVED KAFKA MSG" + message.value);
return 'Hello World';
}
// WORKS
storeSignal(signal: TVSignal){
Logger.log("STORED: " + JSON.stringify(signal))
Logger.log("consumer assignments: " + JSON.stringify(this.kafkaClient.getConsumerAssignments()))
// send message to OnEvent
this.eventEmitter.emit('signal.saved', signal);
// store obj in kafka
return this.kafkaClient.emit('signals', signal); // args - topic, message
}
}