0

我正在尝试在 Scala 程序中插入 ElasticSearch(ES)。

在 build.sbt 我添加了

libraryDependencies += "org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % "7.5.2" ,
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "7.5.2"

我的代码是

val client = new RestHighLevelClient( RestClient.builder(new HttpHost("localhost", 9200, "http")))

编译时出现如下错误

not found: type RestHighLevelClient
not found: value RestClient

我错过了一些进口吗?我的目标是从 Flink 获取流并插入 ElasticSearch 非常感谢任何帮助。

4

2 回答 2

0

要将 Elasticsearch 与 Flink 一起使用,使用 Flink 会更容易ElasticsearchSink,而不是RestHighLevelClient直接使用。但是,用于 Elasticsearch 7.x 的接收器版本即将在 Flink 1.10 中发布,但尚未发布(它很快就会发布;RC1 已经发布)。

使用这个连接器需要一个额外的依赖,例如flink-connector-elasticsearch6_2.11(或flink-connector-elasticsearch7_2.11Flink 1.10 附带的)。

请参阅将Elasticsearch 与 Flink 结合使用的文档

之所以喜欢 Flink 的 sink 而不是RestHighLevelClient自己使用的原因是 Flink sink 发出批量请求、处理错误和重试,并且它与 Flink 的 checkpointing 机制绑定在一起,因此它能够保证如果发生故障不会丢失任何内容。

至于您的实际问题,也许您需要添加

libraryDependencies += "org.elasticsearch.client" % "elasticsearch-rest-client" % "7.5.2"
于 2020-02-05T10:06:38.037 回答
0

我们不需要单独使用这些依赖项来使用 Flink Streaming 在 Elasticsearch 中插入数据。

libraryDependencies += "org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % "7.5.2" ,
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "7.5.2" 

只需使用这个 flink-connector-elasticsearch7 或 flink-connector-elasticsearch6

libraryDependencies += "org.apache.flink" %% "flink-connector-elasticsearch7" % "1.10.0" 

Elasticsearch 的所有依赖项都与 Flink-Elastic 连接器一起提供。所以我们不需要将它们单独包含在build.sbt文件中。

Flink Elasticsearch 的 build.sbt 文件

name := "flink-streaming-demo"

scalaVersion := "2.12.11"

val flinkVersion = "1.10.0"

libraryDependencies += "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
libraryDependencies += "org.apache.flink" %% "flink-connector-elasticsearch7" % flinkVersion

有关更多详细信息,请查看我在此处提供的这个工作 Flink-Elasticsearch 代码。

注意:从 Elastic 6.x 开始,他们开始全面支持 REST 弹性客户端。直到 Elastic5.x,他们一直在使用 Transport 弹性客户端。

于 2020-05-07T09:11:24.070 回答