我尝试了很多选项,包括 withColumn、udf、lambda、foreach、map,但没有得到预期的输出。最大时,我只能转换第一条记录。inputfile.json 将继续增加,并且期望操作应该以所需的结构提供 xml。稍后我将在 Kafka 上制作预期的操作。火花 2.3,Python 2.7。需要在 PySpark 中做。
编辑1:
我可以在具有所需 xml 的主数据框中添加一列。我使用withColumn
并且functions.format_string
能够将字符串(xml 结构)添加到数据框的列中。
现在我的下一个目标是只为 Kafka 生成新列的值。我正在使用df.foreachPartition(send_to_kafka)
并创建了如下功能:
def send_to_kafka(rows):
kafka = SimpleClient('localhost:9092')
producer = SimpleProducer(kafka)
for row in rows:
producer.send_messages('test', str(row.asDict()))
但不幸的是,它做了两件事
:在 Kafka 上生成记录为{'newColumn':u'myXMLPayload'}
. 我不要那个。我只想myXMLPayload
在 Kafka 上制作。
湾。它将 u' 添加到值以对值进行 unicoding。
我想摆脱这两个部分,我会很高兴。任何帮助,将不胜感激。