0

我创建了一个 Debezium Embedded 引擎来捕获 MySQL 更改数据。我想尽快提交补偿。在代码中,创建的配置包括以下内容。

.with("offset.commit.policy",OffsetCommitPolicy.AlwaysCommitOffsetPolicy.class.getName())

运行此返回,java.lang.NoSuchMethodException: io.debezium.embedded.spi.OffsetCommitPolicy$AlwaysCommitOffsetPolicy.<init>(io.debezium.config.Configuration)

但是,当我使用 启动嵌入式引擎时, .with("offset.commit.policy",OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())嵌入式引擎工作正常。

请注意,类OffsetCommitPolicy.PeriodicCommitOffsetPolicy构造函数包含 config 参数,而OffsetCommitPolicy.AlwaysCommitOffsetPolicy没有。

public PeriodicCommitOffsetPolicy(Configuration config) { ... }

如何让 debezium 嵌入式引擎使用它AlwaysCommitOffsetPolicy

4

2 回答 2

0

使用1.4.0Final版本测试:

new EmbeddedEngine.BuilderImpl() // create builder
        .using(config) // regular config
        .using(OffsetCommitPolicy.always()) // explicit commit policy
        .notifying(this::handleEvent) // even procesor
        .build(); // and finally build!
于 2021-02-02T04:54:38.067 回答
0

感谢您的报告。这部分是错误(如果您可以登录我们的 Jira,我们将不胜感激)。您可以通过调用专用方法嵌入式引擎构建器来解决此问题,例如 `io.debezium.embedded.EmbeddedEngine.create().with(OffsetCommitPolicy.always())'

于 2018-10-11T04:18:34.330 回答