1

我有一个 JSON 文件,不幸的是每行前面都有一些不需要的文本:

2019-07-02T22:53:16.848Z LOGFILE {"key":{"host":"example1.net","srcIP":"1.0.0.0","dstIp":"2.0.0.0"},"count":4,"last_seen":"2019-07-02T22:48:15.362Z"}
2019-07-02T22:53:16.937Z LOGFILE {"key":{"host":"example2.net","srcIP":"1.0.0.1","dstIp":"2.0.0.1"},"count":2,"last_seen":"2019-07-02T22:53:07.018Z"}
......

我想按如下方式加载此文件:

from pyspark.sql import SparkSession, SQLContext

spark = SparkSession \
       .builder \
       .appName("LogParser") \
       .getOrCreate()

sc = spark.SparkContext()
sqlContext = SQLContext(sc)

df = sqlContext.read.json('log_sample.json')

但需要一种方法来删除那些不需要的文本,例如,2019-07-02T22:53:16.848Z LOGFILE首先使其成为有效的 JSON。在我打电话之前,你能解释一下如何应用正则表达式sqlContext.read.json()吗?否则它会抱怨它是一个_corrupt_record. 非常感谢!

4

2 回答 2

1

在这种情况下,您必须使用 textFile 加载整个文件,然后将字符串拆分为 json,然后从中创建 DataFrame。

下面的片段可能很有用


log_path = './log.txt'

# Load each line to pair rdd
pair_rdd = sc.textFile(log_path)

# Split str from pair rdd and create new rdd of json string
# You can do more thing with it
json_rdd = pair_rdd.map(lambda x: x.split(' LOGFILE ')[1])

# Convert json rdd to DF
original_df = hive_context.read.json(json_rdd)

original_df.printSchema()
于 2019-07-06T06:39:20.600 回答
0

读取以空格分隔的文件,删除前 2 列并再次保存到 json。试试这是否适合你。

df=spark.read.csv("file.json",sep=" ").drop("_c0","_c1")

df.write.json("yourjasonfile.json")
于 2019-07-06T14:59:18.317 回答