问题标签 [spring-cloud-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring - 在运行时更改 spring-cloud-stream 实例索引/计数
在 spring-cloud-stream 中,有没有办法在不重新启动的情况下更改应用程序的实例计数和实例索引?
另外,有没有推荐的方法来自动填充这些值?在微服务世界中,这似乎非常困难,因为服务一直在启动和停止。
apache-kafka - 无法识别的标记“ÿ”:期待(“真”、“假”或“空”)
我正在尝试向 Kafka 发送消息并使用 Spring 云流从 Kafka 消费相同的消息。我正在使用 Postman 将 JSON String { "acctNo" : "32432", "tn" : "3234" } 发送到我的生产者的其余控制器。我得到 JSON Parserexception :
我的代码是这样的:
生产者应用程序 yaml
生产者休息控制器
生产者代码
消费者应用程序 Yaml
消费者守则
领域类
这是什么原因造成的?请帮忙。
java - o.apache.kafka.common.network.Selector:使用 localhost/127.0.0.1 的 I/O 出错
我的应用程序使用来自一台机器上运行的 Kafka 服务器的消息,然后将它们转发到另一个在其他实例上运行的远程 Kafka。在我将应用程序部署到 Cloud Foundry 并向第一台 Kafka 服务器发送消息后,应用程序按预期工作。消息被消费并转发到远程 Kafka。
但是,在那之后,我在 Cloud Foundry 中得到了以下异常的无限循环(在我的本地机器上也以较慢的速度):
堆栈跟踪:
我的应用程序 yaml 文件是这样的
应用 YML:
我观察到,如果我包含下面的配置,错误就会消失,但现在我有一个无限循环的消息被消耗和发送。
片段:
我需要做什么来停止这个无限循环?
嗨 Marius, 感谢您回复 SOS 电话。我对上述问题进行了改进。流程现在从 a1p(topic:test) 消费,如果消息有效,则转发到 a3p(topic:test),否则将错误消息发送到 a1p(topic:errorMsgQueue)。我有以下应用程序。.yml 文件
弹簧:云:流:绑定:errorMsgQueue:binder:kafka1 目的地:errorMsgQueue contentType:application/json 输入:binder:kafka2 内容类型:application/x-java-object;type=com.comcast.activation.message.vo。 ActivationDataInfo 目的地:测试组:prac
activationMsgQueue: binder: kafka3 目的地: 测试 contentType: application/json binders: kafka1: type: kafka environment: spring: cloud: stream: kafka: binder: brokers: caapmsg-as-a1p.sys.comcast.net zk-nodes: caapmsg -as-a1p.sys.comcast.net kafka2: 类型: kafka 环境: spring: cloud: stream: kafka: binder: brokers: caapmsg-as-a3p.sys.comcast.net zk-nodes: caapmsg-as-a3p。 sys.comcast.net kafka3: 类型: kafka 环境: spring: cloud:流:kafka:binder:代理:caapmsg-as-a1p.sys.comcast.net zk-nodes:caapmsg-as-a1p.sys.comcast.net 默认绑定器:kafka2
我仍然得到一个无限循环。我究竟做错了什么?
spring-cloud-stream - spring-cloud-stream 生产者事务性
我对 kafka binder 进行了一些测试,似乎 spring-cloud-stream 生产者不参与 spring 管理的事务。
给定代码
customerDao.insertCustomer 调用被回滚,但仍然发送了 kafka 消息。如果我有客户事件的消费者将客户插入数据仓库,则数据仓库和记录系统将在事务回滚时不同步。有没有办法让卡夫卡活页夹在这里交易?
spring - 如何从基于 spring-cloud-stream 的微服务调用 python 脚本
我正在考虑使用 spring-cloud-stream 来构建使用 RabbitMQ 作为代理的消息驱动的微服务应用程序。
我看到 spring-cloud-stream 是建立在 Spring-Integration(SI) 之上的,并且 SI 提供 spring-integration-scripting 来执行 python 脚本。但我不知道如何将它连接到 spring-cloud-stream 中。
所以我想知道的是当消息到达队列时,如何依次执行python脚本并获取脚本的输出?
任何代码和配置示例都会有所帮助。
希望有人可以提供帮助。
谢谢总经理
apache-kafka - 手动确认消息:Spring Cloud Stream Kafka
我想要实现的场景是使用来自 Kafka 的消息,处理它,如果某些条件失败,我不希望确认消息。为此,我在 spring cloud stream 参考文档中找到,
autoCommitOffset 是否在处理消息时自动提交偏移量。如果设置为 false,则消息头中将提供一个确认头以用于延迟确认。
默认值:真。
我的问题是将 autoCommitOffset 设置为 false 后,我如何确认消息?一个代码示例将不胜感激。
java - 没有为依赖找到 [com.comcast.activation.message.interfaces.MessageChannels] 类型的合格 bean:
我有一个 spring boot 应用程序,它只是将消息发送到 kafka 主题。代码看起来像这样。
界面 :
制片人:
此应用程序作为独立应用程序运行良好。当我从中创建一个 jar 并将其包含到另一个 Spring Boot 应用程序中,以便我可以使用生产者向主题发送消息并运行 Spring Boot 应用程序时,我得到以下异常:
无法实例化 SpringCLoudStreamClient.,原因是
:没有为依赖项找到类型 [com.comcast.activation.message.interfaces.MessageChannels] 的合格 bean:预计至少有 1 个 bean 有资格作为此依赖项的自动装配候选者。
我得到以下代码行的异常
我使用了组件扫描来确保 jar 中的包被父 Spring Boot 应用程序扫描。这样做之后,我得到了上述异常。启用绑定注释没有做它应该做的事情,即在这个场景中创建一个消息通道接口的实现。这是一个错误还是我错过了什么?
java - Turbine 使用 docker 时只能找到一台主机
我有 3 个项目:一个 hystrix 仪表板、一个涡轮服务器(使用 AMQP)和一个 API
当我开始开发环境时,我设置了 2 个 API 实例(使用端口 8080 和 8081)。为了测试涡轮机聚合,我拨打了电话,在仪表板中,我可以看到Hosts: 2
.
虽然当我使用 Docker 时,即使负载均衡器命中 2 台服务器,我也只能在 hystrix 仪表板上看到一台主机。
我的假设:
1-因为两个容器都在同一个端口(8080)上启动,Turbine 将它们视为一个
2-因为我也对 RabbitMQ 进行了 dockerize,这可能会导致问题
这是我的docker-compose.yml
文件
我的persona_api
配置文件
恐怕如果我将它部署到生产环境(在 Rancher 或 Docker 云上),我会看到同样的问题。
这是我设置两个负载平衡的 API 时发生的事情的 GIF
spring - 具有反应流的 Spring 云流
我的目标是使用 Spring Cloud Stream 和 Kafka 制作应用程序,并发现“反应式”世界。我有一些有用的东西。这是我的消费者的一部分。在我的 pom 中,我声明:
那是我使用的 1.0.0.RELEASE。
我已经声明了我的频道
然后是我的服务
我很好地收到了我的信息。我在依赖项中看到 spring-integration-kafka 依赖于 reactor-core。让我的应用程序“反应”就足够了吗?我应该怎么做才能应用反应式编程风格?
我是否必须使用@EnableRxJavaProcessor,如果是,我不明白如何。
如果我不清楚,请不要犹豫,在评论中写出来。谢谢
java - Kafka 订阅者仅获得备用消息
我有一个主题汽车,只有 1 个分区和 1 个复制因子,如下所示:
Topic:cars PartitionCount:1 ReplicationFactor:1 Configs:
Topic: cars Partition: 0 Leader: 0 Replicas: 0 Isr: 0
我有两个经纪人在运行,一个在 9092 上,另一个在我的本地主机上的 9093 上。
我的 java 应用程序向主题汽车发送消息。我看到一个非常奇怪的行为。
- 我的 Java 应用程序只接收备用消息,例如消息 #1、消息 #3.. 等等
为了调试这个问题,我启动了一个控制台消费者。现在我只能看到我的 java 应用程序没有收到的那些消息。例如,消息 #2、消息 #4 .. 等等
很明显,我的消息已正确发布到主题,那么,是什么导致我的 java 应用程序只接收备用消息?
最后,为什么消费者控制台只展示上述行为?