在对数据执行聚合后,我正在使用 pyspark 将 kafka 流定向到 redis。最终输出是流式数据流。
我连接到 kafka 流的代码。(您可能会发现我的代码是外行工作,请忽略)
app_schema = StructType([
StructField("applicationId",StringType(),True),
StructField("applicationTimeStamp",StringType(),True)
])
# group_id = "mygroup"
topic = "com.mobile-v1"
bootstrap_servers = "server-1:9093,server-2:9093,server-3:9093"
options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user@stream.com" password="xxxxx";',\
"kafka.ssl.ca.location": "/tmp/cert.crt",\
"kafka.sasl.mechanism": "PLAIN",\
"kafka.security.protocol" : "SASL_SSL",\
"kafka.bootstrap.servers": bootstrap_servers,\
"failOnDataLoss": "false",\
"subscribe": topic,\
"startingOffsets": "latest",\
"enable.auto.commit": "false",\
"auto.offset.reset": "false",\
"enable.partition.eof": "true",\
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_mobile_apps_df = spark.readStream.format("kafka").options(**options).options().load()
kafka_mobile_apps_df = kafka_mobile_apps_df\
.select(from_json(col("value").cast("string"), app_schema).alias("mob_apps"))
作为订阅经纪人,这给了我流数据框。在此之后,我将数据聚合到 count_df,如图所示
count_df = kafka_mobile_apps_df.withColumn("diff_days", ((col("TimeStamp_")) - (col("TimeStamp")))/(60.0*60.0*24))\
.withColumn("within_7d_ind", when(col("diff_days") < 7.0, 1).otherwise(0))\
.groupBy("_applicationId")
.agg(sum(col("within_7d_ind")).alias(feature+"_7day_velocity"))
现在我正在尝试将此 count_df 流写入 redis。在我的 resreach 之后,我发现我可以使用“spark-redis_2.11”进行 spark-redis 连接。
我不知道 scala,我找到了一个带有 scala 的 spark-redis github 示例。有人可以帮助在pyspark中写入以将这个count_df写入带有身份验证的redis的确切方法是什么
请在这里找到 spark-redis github
我已经在集群上安装了所需的 jar “com.redislabs:spark-redis_2.12:2.5.0”。
谢谢。
刚发现他们还不支持python,请告诉我还有其他方法可以写吗?