问题标签 [redis-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
48 浏览

apache-kafka - 事件驱动架构 - 主题/流设计

这可能是某种最佳实践问题。请从事此工作的人举例说明。让我们所有人都能受益!

对于使用 Kafka / Redis 的事件驱动架构,当我们为事件创建主题/流时,要遵循哪些最佳实践。

让我们考虑在线订单处理工作流程。

我读了一些博客说创建主题/流,如order-created-eventsorder-deleted-events。但我的问题是,当我们将其拆分为多个主题时,如何保证消息的顺序。

例如:

order-created-events 可能有数千个事件并由消费者缓慢处理。假设只有 5-10% 会取消订单,order-deleted-events 在队列中可能只有很少的记录。

现在,让我们假设,用户首先下订单。然后他立即取消。这将使 order-deleted-event 首先处理,因为在某些消费者处理同一订单的 order-created-event 之前,主题/流没有太多消息。会造成一些数据不一致。

希望我的问题很清楚。那么,如何提出主题/流设计?

0 投票
1 回答
1326 浏览

spring - Spring Data Redis Streams,无法弄清楚我未确认的消息发生了什么?

我正在使用以下代码使用 Spring Data Redis 消费者组来消费 Redis 流,但即使我已经注释掉了确认命令,我的消息在服务器重新启动后也不会重新读取。

我希望如果我没有确认该消息,则应该在服务器被杀死并重新启动时重新读取它。我在这里想念什么?

0 投票
1 回答
1668 浏览

node.js - 节点 Redis XREAD 阻塞订阅

我正在将一个 redis 发布/订阅系统转换为 redis 流,这样我就可以为我的服务器发送的事件添加一些容错。

订阅传统方式是微不足道的:

这将永久阻塞,并保持连接打开。在这种情况下,发布到 redis 将添加到您的日志中。

切换到流,你使用XREAD而不是订阅,而XADD不是发布,并且数据有很大的不同。我正在努力解决的部分是阻塞。

发送消息时,我的“订阅”会接收第一个消息,但不会记录更多消息。

0 投票
1 回答
645 浏览

redis - 从 Redis 流中删除一系列条目

我想删除早于特定条目 ID 的 Redis Stream 条目。但是 XDEL 命令将每个 ID 明确地作为输入。当流中有大量条目时,是否有任何方法可以指定 ID 范围?修剪一系列条目也将帮助我回忆起未完成的记忆。

0 投票
1 回答
383 浏览

redis - Redis 保证 XREAD 返回值?

可以使用 XREAD(或者可能是另一个命令)以原子方式检测数据是否写入 Redis 流?

进一步来说:

假设您在一个进程中将一些数据添加到 Redis 流中,并看到数据已通过某个自动生成的密钥成功添加。

此 XADD 完成后,您立即在另一个进程中运行以下读取。

这个 XREAD 是否保证返回数据?文档不清楚成功的 XADD 是否保证读者会立即看到添加的数据,或者是否可能会有一些小的延迟。

0 投票
1 回答
519 浏览

redis - 为什么我在读取 redis 流时得到 (nil)

在我的信息流中,我有 1 条待处理消息:

它不能是(零),但仍然......我读到了新消息:

新消息不为空,但如果我阅读消息 - 两者都是(无)

为什么?

0 投票
0 回答
68 浏览

python-3.x - influxDB如何实现有序批量插入

我有一个要求,因为 Redis-Stream 结构包含specified length of data我希望的一个transfer to influxDB in bulk,但是在传输过程中,批量插入的数据不会与原始数据的顺序相同

数据生成代码示例如下所示

数据读取和保存代码示例如下所示

当我查看数据时,您会看到 Count 数据顺序未按该顺序输出

有没有办法实现顺序批量插入?

如果您能告诉我如何解决它,我将不胜感激?

0 投票
1 回答
1100 浏览

python-3.x - 使用 influxdb-python 的 Influxdb 批量插入

0 投票
1 回答
779 浏览

kotlin - 使用 Redis Streams 和 Spring Data 获取待处理消息

我在我的 Spring Boot 应用程序中使用 Redis Streams。在调度程序中,我经常想要获取所有待处理的消息并检查它们已经处理了多长时间,并在必要时重新触发它们。

我现在的问题是我可以获取待处理的消息,但我不确定如何获取有效负载。

我的第一种方法使用了pendingandrange操作。这里的缺点是 totalDeliveryCount 没有增加range- 所以我不能使用 range 方法

我的第二种方法使用了pendingandread操作。对于读取操作,我可以使用当前 ID 指定偏移量。问题是我只能得到高于指定 ID 的 ID。

因此,当我使用ReadOffset.from('1234-0')该消息时,我不会收到消息,1234-0而是收到消息之后的所有内容。有没有办法获得确切的信息并尊重totalDeliveryCountelapsedTimeSinceLastDelivery统计?

我正在使用 spring-data-redis 2.3.1.RELEASE

0 投票
1 回答
300 浏览

redis - 使用 XREADGROUP 获取指定的 id

我想知道是否有一个命令或参数可以用来获取指定的 RecordId 的消息,例如

我想要 ID 为 12345-0 的消息,但似乎我收到了 12345-0 之后的第一条消息。

我不能使用 XRANGE,因为它不会更新 deliveryCount 和 lastDeliveryTime,而且它似乎不理解消费者群体的概念。

我也知道

这给了我所有待处理的消息,但这会更新所有消息的deliveryCount,我不希望这样。