0

好像 MQTTUtils 只提供了三个方法, def createStream(jssc: JavaStreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel): JavaDStream[String]

创建一个接收 MQTT 发布者推送的消息的输入流。def createStream(jssc:JavaStreamingContext,brokerUrl:字符串,主题:字符串):JavaDStream [字符串]

创建一个接收 MQTT 发布者推送的消息的输入流。def createStream(ssc: StreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[String]

创建一个接收 MQTT 发布者推送的消息的输入流。

但是,如果代理启用了身份验证,我如何提供用户名和密码?

4

3 回答 3

0

您可以尝试在 url 中包含用户名和密码:

mqtt://用户名:密码@主机:端口

于 2015-01-05T10:58:16.907 回答
0

您可以尝试使用此处提供的自定义 spark-streaming-mqtt-connector 库 - https://github.com/sathipal/spark-streaming-mqtt-with-security_2.10-1.3.0

该库在原始库之上添加了以下内容,

  • 添加了 TLS v1.2 安全性,以便始终保护通信。
  • 将主题与有效负载一起存储在 RDD 中。

因此,使用以下方法创建流,

val lines = MQTTUtils.createStream(ssc, // Spark Streaming Context
            "ssl://URL",                // Broker URL
            "<topic>",                 // MQTT topic
            "MQTT client-ID",          // Unique ID of the application
            "Username", 
            "passowrd")

有一些重载的构造函数也允许你传递 RDD 存储级别。希望这可以帮助。

于 2016-02-18T07:23:24.080 回答
0

请找到这个MQTT Scala Word Count Example

特别针对您的情况,将发布者运行为

bin/run-example org.apache.spark.examples.streaming.MQTTPublisher mqtt://username:password@host:port foo

和订阅者作为

bin/run-example org.apache.spark.examples.streaming.MQTTWordCount mqtt://username:password@host:port foo

在执行此操作之前,请确保您已启动 ActiveMQ 代理。

示例代码

import org.apache.activemq.broker.{TransportConnector, BrokerService}
.
.
.
.
def startActiveMQMQTTBroker() {
    broker = new BrokerService()
    broker.setDataDirectoryFile(Utils.createTempDir())
    connector = new TransportConnector()
    connector.setName("mqtt")
    connector.setUri(new URI("mqtt:" + brokerUri))
    broker.addConnector(connector)
    broker.start()
}

.pom 文件

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
</dependency>
于 2015-07-30T22:16:55.577 回答