问题标签 [spring-cloud-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
cloud-foundry - Spring Cloud Dataflow 有什么好处?
根据我所见,在 Spring Cloud Dataflow (SCDF) 中创建流将部署底层应用程序、绑定通信服务(如 RabbitMQ)、设置 Spring Cloud Stream 环境变量并启动应用程序。这一切都可以使用 cf push 命令轻松手动完成。
同时,我遇到了 Spring Cloud Dataflow 的一些缺点:
- SCDF 服务器是 PCF 上的内存占用者(我有一个只有 6 个应用程序的流,但我需要大约 10GB 的服务器空间)
- 应用程序命名、内存、实例等方面没有灵活性。(您通常会在 manifest.yml 中设置的所有内容)
- 与构建工具(如 Bamboo)的集成将需要额外的工作,因为我们必须使用 SCDF CLI 而不仅仅是 PCF CLI
- 无法修改现有流。要进行蓝绿部署,您必须手动部署应用程序(绑定服务并手动设置环境变量)。然后,一旦完成蓝绿部署,SCDF 将流显示为失败,因为它不知道底层应用程序之一已更改。
- 我遇到的各种错误,例如尝试重新部署失败的流时出现 MySQL 主键约束错误
那么我错过了什么?为什么使用 Spring Cloud Dataflow 有利于手动部署应用程序?
spring-cloud - Spring Cloud Stream 流作为一个应用程序
据我所知,有一个选项可以通过使用AggregateApplication
或使用 Spring Cloud Stream 的几个组件作为一个应用程序AggregateApplicationBuilder
。
据我了解,在这种情况下,spring 不会使用代理(Rabbit 或 Kafka)在步骤之间进行通信,它只会将上一步的结果作为参数几乎直接传递给下一个,对吗?
如果我是,是否有另一种方法可以使用代理在应用程序的一个实例中运行更多组件?我知道这不是一个非常适合 Cloud Stream 的架构,但现在我没有可以运行 Dataflow 的基础架构,而且我想使用代理的持久性。
spring-cloud-stream - 有没有办法在 Spring Cloud Data Flow 中配置 Kafka 客户端超时设置?
启动 Spring Cloud Data Flow 流时,由于各种 Kafka 相关错误,应用程序经常无法在我的机器上部署。例如:
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder
[...]
Caused by: kafka.common.KafkaException: fetching topic metadata for topics [Set(xxx)] from broker [List()] failed
但我也看到了这个:
kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
在这两种情况下,Kafka 进程(以及 ZooKeeper)都在运行,所以我假设已经通过了某种超时阈值。
通用配置和特定于Kafka 的配置似乎都没有提供任何超时选项。
有什么方法可以影响 Kafka 客户端放弃并假设代理已经离开的时间量?
java - spring cloud stream kafka:“输入”的重复@StreamListener映射
我有一个生产者(Source
)将消息写入 Kafka 输出通道,该生产者发送不同类型的对象。在消费者方面 ( Sink
) 我想@Streamlistener
在同一个输入通道上有多个方法,每个方法都有不同的参数类型,目前我得到它不起作用Duplicate @StreamListener mapping for 'input'
。
这是我的消费者代码:
有没有比使用单个@StreamListener
方法更好的解决方案来测试消息类型以选择要调用的方法?
java - Spring Cloud Stream Kafka > 使用来自 Confluent REST 代理的 Avro 消息
我有以下情况:
- 如http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce中所述,生产者通过 Confluent 的 REST 代理(在 Confluent 的模式注册表上注册模式)向 Kafka 主题发送 Avro 编码消息-and-consume-avro-messages
- 启用 Spring Cloud Stream 的消息在主题上侦听新消息
我的应用程序如下所示:
而 MyMessage 是 Avro 从 Avro 模式创建的类。
我的 application.properties 看起来像这样:
我现在的问题是每次收到新消息时,都会抛出以下异常:
据我了解,问题在于 Confluent 堆栈包含消息模式的 ID 作为消息有效负载的一部分,并且客户端应该在模式 ID 之后开始读取实际的 Avro 消息。看来我需要配置 Kafka 绑定以使用 Confluent 的 KafkaAvroDeserializer,但我不知道如何实现这一点。
(我可以使用 Confluent 的 avro 控制台消费者完美地检索消息,因此 Avro 编码似乎不是问题)
我还尝试使用 @EnableSchemaRegistry 注释并配置 ConfluentSchemaRegistryClient bean,但在我看来,这仅控制模式的存储/检索位置,而不是实际的反序列化。
这甚至应该以某种方式工作吗?
spring-cloud - SpringCloudStream - AggregateApplicationBuilder 需要代理
我正在使用AggregateApplicationBuilder的第 3.1.6 节中描述的 Cloud Stream 。我按照这些部分运行应用程序以获取以下错误,该错误需要代理配置:
您能否在这里帮助我了解如何在传递参考指南中描述的信息时消除经纪人?
谢谢卡提克
java - spring cloud bus - 重命名rabbitmq队列?
有没有办法告诉 spring bus 重命名它的 rabbitmq 队列?在启动时,它们似乎只是一些随机值,如下所示:
springCloudBus.anonymous.4zzIP0z-TH6oIza5mCun7Q
试图让 spring bus 将其重命名为更易于阅读的可预测队列名称。例如:
testQueue
或者知道它为什么服务保存消息的东西。
我尝试将以下内容添加到 bootRun 上的 application.yml:
无济于事。请帮忙!!
spring-cloud - SpringCloudStream - RabbitMQ binder 的慢消费者
我有一个用例将 http POST 发送到使用 Spring Cloud Stream App Starter 的 http 源创建为 Spring Boot APP 的 HTTP 源。这个过程在发布 5k 记录/秒。我有一个接收器应用程序将数据保存到 Mongo DB。在应用程序中读取非常慢,为 20 条消息/秒。我正在使用以下属性,没有发现任何区别。我使用相同的前缀来加载属性 - spring.cloud.stream.rabbit.binder。你能告诉我如何在从 RabbitMQ binder 读取数据中实现并发吗?
应用程序属性
感谢并感谢您的帮助 Karthik
spring-cloud-stream - 用于函数式编程的 Spring Cloud Task - 如何真正实现它?
我们需要在我们的私有 Cloud Foundry 堆栈上按照 FRP(类似于 AWS Lambda)的方式开发按需应用程序。这是为了节省成本,否则总是运行低容量的应用程序。
仅当消息出现在我们的基于 JMS 的消息传递系统(例如 IBM MQ)上时,才需要触发/启动应用程序。作为低容量应用程序,它应该在所有其他时间保持停止。
根据对 Spring 文档的详细研究和搜索,在我看来,每当消息出现在源队列上时触发/启动我的微服务并在完成后将其关闭的理想解决方案将需要以下组合:Spring Cloud Data Flow + Spring Cloud Stream + Spring Cloud Task 作为技术栈。
在各种示例中,以下组合看起来像是在本地机器上启动 PoC 所需的组合:
https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/tasksink https://github.com/spring-cloud/spring-cloud-stream-samples /tree/master/sink https://github.com/spring-cloud/spring-cloud-dataflow/tree/master/spring-cloud-dataflow-server-local
但是,我仍然不确定如何将其全部插入并取得结果。
取得了一些进展,但构建此解决方案的完全清晰性仍不清楚。
据我所知,我们似乎需要...
- 运行 Spring Cloud DataFlow 服务器
- 运行 Spring Cloud DataFlow Shell 或 UI
- 创建 Spring Cloud Stream sink 应用程序并运行它(我们是否也需要在 Maven 中安装它?)
- 创建 Spring Cloud Task sink 应用程序并将其安装在 Maven Repo 中
- 我还需要创建一个 Spring Cloud Task 吗?
- 注册模块(但是这种情况下的模块是什么?)
- 在 DF Server 中创建 Stream 并在其上部署
- 更多的东西?
...实现结果。
有人可以在这方面帮助我并建议这里是否遗漏了任何构建块?如果上述问题的答案是已知的。
我的理解也有任何差距,我是否足够正确地假设它@EnableTask
会使我的应用程序仅在触发时运行并保持它在其他情况下停止?
spring-cloud - SpringCloudStream-HttpSource 返回 404
我正在开发 SpringCloudStream 的 Brooklyn.Release 版本。我的用例有多个接收器的 HttpSource。当我将 Starter App 依赖项添加到应用程序并使用它时,如下所示:
我的聚合应用程序是
一直得到以下响应:
为 HttpSourceConfiguration(开箱即用)添加了 ComponentScan;但没有成功。
如果我将相同的 SourceApplication 与 Rabbit Binder 一起使用,它会按预期工作。你能指导我完成这项工作吗?
感谢您的帮助和时间。
问候卡提克