我正在将一个 redis 发布/订阅系统转换为 redis 流,这样我就可以为我的服务器发送的事件添加一些容错。
订阅传统方式是微不足道的:
import { createClient } from 'redis';
const redisOptions = {
url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);
redis.on("message", (channel, message) => {
console.log(channel);
console.log(message);
});
redis.subscribe('foo');
这将永久阻塞,并保持连接打开。在这种情况下,发布到 redis 将添加到您的日志中。
const json = { a: 1, b: 2 };
redis.publish('foo', JSON.stringify(json));
切换到流,你使用XREAD
而不是订阅,而XADD
不是发布,并且数据有很大的不同。我正在努力解决的部分是阻塞。
redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
if (err) return console.error('Error reading from stream:', err);
str.forEach(message => {
console.log(message);
});
}
发送消息时,我的“订阅”会接收第一个消息,但不会记录更多消息。