9

我正在将一个 redis 发布/订阅系统转换为 redis 流,这样我就可以为我的服务器发送的事件添加一些容错。

订阅传统方式是微不足道的:

import { createClient } from 'redis';

const redisOptions = {
  url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);

redis.on("message", (channel, message) => {
  console.log(channel);
  console.log(message);
});

redis.subscribe('foo');

这将永久阻塞,并保持连接打开。在这种情况下,发布到 redis 将添加到您的日志中。

const json = { a: 1, b: 2 };

redis.publish('foo', JSON.stringify(json));

切换到流,你使用XREAD而不是订阅,而XADD不是发布,并且数据有很大的不同。我正在努力解决的部分是阻塞。

redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
  if (err) return console.error('Error reading from stream:', err);

  str.forEach(message => {
    console.log(message);
  });
}

发送消息时,我的“订阅”会接收第一个消息,但不会记录更多消息。

4

1 回答 1

12

说实话,我只问了这个问题,因为谷歌对我没有好处,而且我找不到其他人发布关于这个问题的帖子。希望这可以帮助!

因此,XREAD仅在初始呼叫时阻塞。它会坐下来等待一段设定的时间(或者如果您将时间设置为 0 则无限期地等待),但是一旦它接收到数据,它的职责就被认为已经完成,并且它会解除阻塞。要保持“订阅”处于活动状态,您需要XREAD使用流中的最新 id 再次调用。这将替换$我们传递给它的初始值。

递归似乎是一个完美的解决方案:

const xread = ({ stream, id }) => {
  redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
    if (err) return console.error('Error reading from stream:', err);

    str[0][1].forEach(message => {
      id = message[0];
      console.log(id);
      console.log(message[1]);
    });

    setTimeout(() => xread({ stream, id }), 0)
  });
}

xread({ stream: 'asdf', id: '$' })

于 2020-06-03T18:03:32.410 回答