问题标签 [redis-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 事件驱动架构 - 主题/流设计
这可能是某种最佳实践问题。请从事此工作的人举例说明。让我们所有人都能受益!
对于使用 Kafka / Redis 的事件驱动架构,当我们为事件创建主题/流时,要遵循哪些最佳实践。
让我们考虑在线订单处理工作流程。
我读了一些博客说创建主题/流,如order-created-events
等order-deleted-events
。但我的问题是,当我们将其拆分为多个主题时,如何保证消息的顺序。
例如:
order-created-events 可能有数千个事件并由消费者缓慢处理。假设只有 5-10% 会取消订单,order-deleted-events 在队列中可能只有很少的记录。
现在,让我们假设,用户首先下订单。然后他立即取消。这将使 order-deleted-event 首先处理,因为在某些消费者处理同一订单的 order-created-event 之前,主题/流没有太多消息。会造成一些数据不一致。
希望我的问题很清楚。那么,如何提出主题/流设计?
spring - Spring Data Redis Streams,无法弄清楚我未确认的消息发生了什么?
我正在使用以下代码使用 Spring Data Redis 消费者组来消费 Redis 流,但即使我已经注释掉了确认命令,我的消息在服务器重新启动后也不会重新读取。
我希望如果我没有确认该消息,则应该在服务器被杀死并重新启动时重新读取它。我在这里想念什么?
node.js - 节点 Redis XREAD 阻塞订阅
我正在将一个 redis 发布/订阅系统转换为 redis 流,这样我就可以为我的服务器发送的事件添加一些容错。
订阅传统方式是微不足道的:
这将永久阻塞,并保持连接打开。在这种情况下,发布到 redis 将添加到您的日志中。
切换到流,你使用XREAD
而不是订阅,而XADD
不是发布,并且数据有很大的不同。我正在努力解决的部分是阻塞。
发送消息时,我的“订阅”会接收第一个消息,但不会记录更多消息。
redis - 从 Redis 流中删除一系列条目
我想删除早于特定条目 ID 的 Redis Stream 条目。但是 XDEL 命令将每个 ID 明确地作为输入。当流中有大量条目时,是否有任何方法可以指定 ID 范围?修剪一系列条目也将帮助我回忆起未完成的记忆。
redis - Redis 保证 XREAD 返回值?
可以使用 XREAD(或者可能是另一个命令)以原子方式检测数据是否写入 Redis 流?
进一步来说:
假设您在一个进程中将一些数据添加到 Redis 流中,并看到数据已通过某个自动生成的密钥成功添加。
此 XADD 完成后,您立即在另一个进程中运行以下读取。
这个 XREAD 是否保证返回数据?文档不清楚成功的 XADD 是否保证读者会立即看到添加的数据,或者是否可能会有一些小的延迟。
redis - 为什么我在读取 redis 流时得到 (nil)
在我的信息流中,我有 1 条待处理消息:
它不能是(零),但仍然......我读到了新消息:
新消息不为空,但如果我阅读消息 - 两者都是(无)
为什么?
python-3.x - influxDB如何实现有序批量插入
我有一个要求,因为 Redis-Stream 结构包含specified length of data
我希望的一个transfer to influxDB in bulk
,但是在传输过程中,批量插入的数据不会与原始数据的顺序相同
数据生成代码示例如下所示
数据读取和保存代码示例如下所示
当我查看数据时,您会看到 Count 数据顺序未按该顺序输出
有没有办法实现顺序批量插入?
如果您能告诉我如何解决它,我将不胜感激?
kotlin - 使用 Redis Streams 和 Spring Data 获取待处理消息
我在我的 Spring Boot 应用程序中使用 Redis Streams。在调度程序中,我经常想要获取所有待处理的消息并检查它们已经处理了多长时间,并在必要时重新触发它们。
我现在的问题是我可以获取待处理的消息,但我不确定如何获取有效负载。
我的第一种方法使用了pending
andrange
操作。这里的缺点是 totalDeliveryCount 没有增加range
- 所以我不能使用 range 方法
我的第二种方法使用了pending
andread
操作。对于读取操作,我可以使用当前 ID 指定偏移量。问题是我只能得到高于指定 ID 的 ID。
因此,当我使用ReadOffset.from('1234-0')
该消息时,我不会收到消息,1234-0
而是收到消息之后的所有内容。有没有办法获得确切的信息并尊重totalDeliveryCount
和elapsedTimeSinceLastDelivery
统计?
我正在使用 spring-data-redis 2.3.1.RELEASE
redis - 使用 XREADGROUP 获取指定的 id
我想知道是否有一个命令或参数可以用来获取指定的 RecordId 的消息,例如
我想要 ID 为 12345-0 的消息,但似乎我收到了 12345-0 之后的第一条消息。
我不能使用 XRANGE,因为它不会更新 deliveryCount 和 lastDeliveryTime,而且它似乎不理解消费者群体的概念。
我也知道
这给了我所有待处理的消息,但这会更新所有消息的deliveryCount,我不希望这样。