我一直致力于在 Scala 中为 Spring Cloud Data Flow 创建一个简单的自定义处理器,并且遇到了从/向启动应用程序发送/接收数据的问题。我一直无法看到通过流传播的任何消息。流的定义是我的自定义处理器time --trigger.time-unit=SECONDS | pass-through-log | log
在哪里。pass-through-log
我正在使用 Spring Cloud Data Flow 2.5.1 和 Spring Boot 2.2.6。
这是用于处理器的代码 - 我正在使用功能模型。
@SpringBootApplication
class PassThroughLog {
@Bean
def passthroughlog(): Function[String, String] = {
input: String => {
println(s"Received input `$input`")
input
}
}
}
object PassThroughLog {
def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}
应用程序.yml
spring:
cloud:
stream:
function:
bindings:
passthroughlog-in-0: input
passthroughlog-out-0: output
构建.gradle.kts
// scala
implementation("org.scala-lang:scala-library:2.12.10")
// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")
如果此处缺少代码示例,我已将整个项目发布到github 。我还在那里发布了日志,因为它们很长。
当我引导本地 Kafka 集群并将任意数据推送到input
主题时,我能够看到数据流经处理器。但是,当我在 Spring Cloud Data Flow 上部署应用程序时,情况并非如此。我正在 Kubernetes 中通过 Docker 部署应用程序。
此外,当我使用定义部署流时time --trigger.time-unit=SECONDS | log
,我会在日志接收器中看到消息。这让我确信问题出在定制处理器上。
我是否缺少一些简单的东西,例如依赖项或额外配置?任何帮助是极大的赞赏。