我正在尝试使用安装了 x-pack 的 Elasticsearch (ES) 6.1.1 Hadoop 来使用 Spark Structured Streaming 2.2.1 写入数据。这是我的代码(索引已经存在于弹性中):
val exceptions = spark
.readStream
.text(path)
val advancedQuery = exceptions
.writeStream
.format("org.elasticsearch.spark.sql")
.trigger(Trigger.ProcessingTime(10.seconds))
.option("checkpointLocation", "/checkpoint")
val runningQuery = advancedQuery.start("spark/exc")
runningQuery.awaitTermination
但我确实得到了以下异常:
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:327)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:575)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: missing authentication token for REST request [/]
null**
如何设置所需的身份验证数据?