1

我一直在阅读大量 Spring Cloud DataFlow 和相关文档,以便生成将在我组织的 Cloud Foundry 部署中运行的数据摄取解决方案。目标是轮询 HTTP 服务以获取数据,为了讨论可能每天 3 次,然后在 PostgreSQL 数据库中插入/更新该数据。HTTP 服务似乎每天提供数十万条记录。

到目前为止,令人困惑的一点是在 DataFlow 管道上下文中对轮询记录进行重复数据删除的最佳实践。源数据没有时间戳字段来帮助跟踪轮询,只有粗略的日级别日期字段。我也不能保证不会追溯更新记录。这些记录似乎有一个唯一的 ID,因此我可以通过这种方式对记录进行重复数据删除,但我只是不确定根据文档如何最好地在 DataFlow 中实现该逻辑。据我所知,Spring Cloud Stream 启动器不提供这种开箱即用的功能。我正在阅读有关 Spring Integration 的smart polling的信息,但我不确定这是否也能解决我的担忧。

我的直觉是在 DataFlow Stream 中创建一个自定义处理器 Java 组件,该组件执行数据库查询以确定是否已插入轮询记录,然后将适当的记录插入目标数据库,或者将它们传递到流的下游。在 Stream 应用程序中是否可以接受在中间步骤中查询目标数据库?或者,我可以在 Spring Cloud Task 中将这一切作为一个批处理操作来实现,该操作基于某个计划触发。

处理 DataFlow 应用程序的最佳方法是什么?如我在 DataFlow/Stream/Task/Integration 应用程序中所述,实现重复数据删除的常见/最佳实践是什么?我应该复制入门应用程序的设置还是从头开始,因为我相当确定我需要编写自定义代码?我什至需要 Spring Cloud DataFlow,因为我不确定我是否会使用它的 DSL?对所有问题表示歉意,但是对于 Cloud Foundry 和所有这些 Spring 项目来说是新手,将它们拼凑在一起是令人生畏的。

提前感谢您的帮助。

4

2 回答 2

2

您走在正确的轨道上,鉴于您的要求,您很可能需要创建一个自定义处理器。您需要跟踪已插入的内容以避免重复。

没有什么可以阻止您在流应用程序中编写这样的处理器,但是性能可能会受到影响,因为对于每条记录,您都会发出一个数据库查询。

如果顺序不重要,您可以并行化查询,以便处理多个并发消息,但最终您的数据库仍会付出代价。

另一种方法是使用布隆过滤器,它可以极大地帮助您加快检查插入记录的速度。

您可以从克隆启动应用程序开始,您可以让轮询器触发一个 http 客户端处理器来获取您的数据,然后通过您的自定义代码处理器,最后到达 jdbc-sink。就像是stream create time --triger.cron=<CRON_EXPRESSION> | httpclient --httpclient.url-expression=<remote_endpoint> | customProcessor | jdbc

使用 SCDF 的优点之一是您可以通过部署属性独立扩展自定义处理器,例如deployer.customProcessor.count=8

于 2017-08-14T15:35:06.553 回答
1

Spring Cloud Data Flow 为基于 Spring Cloud Stream 的数据构建集成流,而 Spring Cloud Stream 又完全基于 Spring Integration。Spring Integration 中存在的所有原则都可以在 SCDF 级别的任何地方应用。

这确实可能是您无法避免一些编码的情况,但您需要的是在 EIP Idempotent Receiver中调用。Spring Integration为我们提供了一个:

    @ServiceActivator(inputChannel = "processChannel")
    @IdempotentReceiver("idempotentReceiverInterceptor")
    public void handle(Message<?> message)
于 2017-08-14T15:35:49.917 回答