0

我一直致力于在 Scala 中为 Spring Cloud Data Flow 创建一个简单的自定义处理器,并且遇到了从/向启动应用程序发送/接收数据的问题。我一直无法看到通过流传播的任何消息。流的定义是我的自定义处理器time --trigger.time-unit=SECONDS | pass-through-log | log在哪里。pass-through-log

我正在使用 Spring Cloud Data Flow 2.5.1 和 Spring Boot 2.2.6。

这是用于处理器的代码 - 我正在使用功能模型。

@SpringBootApplication
class PassThroughLog {

  @Bean
  def passthroughlog(): Function[String, String] = {
    input: String => {
      println(s"Received input `$input`")
      input
    }
  }
}

object PassThroughLog {
  def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}

应用程序.yml

spring:
  cloud:
    stream:
      function:
        bindings:
          passthroughlog-in-0: input
          passthroughlog-out-0: output

构建.gradle.kts

// scala
implementation("org.scala-lang:scala-library:2.12.10")

// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")

如果此处缺少代码示例,我已将整个项目发布到github 。我还在那里发布了日志,因为它们很长。

当我引导本地 Kafka 集群并将任意数据推送到input主题时,我能够看到数据流经处理器。但是,当我在 Spring Cloud Data Flow 上部署应用程序时,情况并非如此。我正在 Kubernetes 中通过 Docker 部署应用程序。

此外,当我使用定义部署流时time --trigger.time-unit=SECONDS | log,我会在日志接收器中看到消息。这让我确信问题出在定制处理器上。

我是否缺少一些简单的东西,例如依赖项或额外配置?任何帮助是极大的赞赏。

4

2 回答 2

0

在 SCDF 中使用 Spring Cloud Stream 3.x 版本时,您必须设置一个附加属性以让 SCDF 知道哪些通道绑定配置为输入和输出通道。

请参阅:功能应用程序

请特别注意以下属性:

app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=输出

app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=输入

在您的情况下,您将必须分别映射passthroughlog-in-0passthroughlog-out-0函数绑定。inputoutput

于 2020-06-11T04:47:10.273 回答
0

原来问题出在我的 Dockerfile 上。为了便于配置,我有一个 build 参数来指定ENTRYPOINT. 为此,我使用了 shell 版本的ENTRYPOINT. 将我的更改ENTRYPOINT为 exec 版本解决了我的问题。

的 shell 版本ENTRYPOINT不能很好地处理图像参数 ( docker run <image> <args>),因此 SCDF 无法将适当的参数传递给容器。

从以下位置更改我的 Dockerfile:

FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ENV JAR $JAR
ADD build/libs/$JAR .
ENTRYPOINT java -jar $JAR

FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ADD build/libs/$JAR program.jar
ENTRYPOINT ["java", "-jar", "program.jar"]

解决了这个问题。

于 2020-06-12T05:00:57.087 回答