0

我尝试了很多选项,包括 withColumn、udf、lambda、foreach、map,但没有得到预期的输出。最大时,我只能转换第一条记录。inputfile.json 将继续增加,并且期望操作应该以所需的结构提供 xml。稍后我将在 Kafka 上制作预期的操作。火花 2.3,Python 2.7。需要在 PySpark 中做。

编辑1:

我可以在具有所需 xml 的主数据框中添加一列。我使用withColumn并且functions.format_string能够将字符串(xml 结构)添加到数据框的列中。

现在我的下一个目标是只为 Kafka 生成新列的值。我正在使用df.foreachPartition(send_to_kafka)并创建了如下功能:

def send_to_kafka(rows):
    kafka = SimpleClient('localhost:9092')
    producer = SimpleProducer(kafka)
    for row in rows:
        producer.send_messages('test', str(row.asDict()))

但不幸的是,它做了两件事
:在 Kafka 上生成记录为{'newColumn':u'myXMLPayload'}. 我不要那个。我只想myXMLPayload在 Kafka 上制作。
湾。它将 u' 添加到值以对值进行 unicoding。

我想摆脱这两个部分,我会很高兴。任何帮助,将不胜感激。

4

0 回答 0