使用此数据集:
start,end,rms,state,maxTemp,minTemp
2019-02-20T16:16:31.752Z,2019-02-20T17:33:34.750Z,4.588481,charge,35.0,32.0
2019-02-20T17:33:34.935Z,2019-02-20T18:34:49.737Z,5.770562,discharge,35.0,33.0
和这个:
[{"EventDate":"2019-02-02T16:17:00.579Z","Value":"23"},
{"EventDate":"2019-02-02T16:18:01.579Z","Value":"23"},
{"EventDate":"2019-02-02T16:19:02.581Z","Value":"23"},
{"EventDate":"2019-02-02T16:20:03.679Z","Value":"23"},
{"EventDate":"2019-02-02T16:21:04.684Z","Value":"23"},
{"EventDate":"2019-02-02T17:40:05.693Z","Value":"23"},
{"EventDate":"2019-02-02T17:40:06.694Z","Value":"23"},
{"EventDate":"2019-02-02T17:40:07.698Z","Value":"23"},
{"EventDate":"2019-02-02T17:40:08.835Z","Value":"23"}]
schema = StructType([
StructField('EventDate', TimestampType(), True),
StructField('Value', FloatType(), True)
])
我想将 json 数据集的最大值和最小值作为列添加到 csv 数据集中。
我努力了:
cyclesWithValues = csvDf\
.withColumn("max", jsondata.filter((col("EventDate") >= csvDf.start) & (col("EventDate") <= csvDf.end)).agg({"value": "max"}).head()["max(Value)"])\
.withColumn("min", jsondata.filter((col("EventDate") >= csvDf.start) & (col("EventDate") <= csvDf.end)).agg({"value": "min"}).head()["min(Value)"])
但我得到这个错误:
AnalysisException: 'Resolved attribute(s) start#38271,end#38272 missing from EventDate#38283,Value#38286 in operator !Filter ((EventDate#38283 >= start#38271) && (EventDate#38283 <= end#38272)).;;\n!Filter ((EventDate#38283 >= start#38271) && (EventDate#38283 <= end#38272))\n+- Project [EventDate#38283, cast(Value#38280 as float) AS Value#38286]\n +- Project [to_timestamp(EventDate#38279, None) AS EventDate#38283, Value#38280]\n +- Relation[EventDate#38279,Value#38280] json\n'
我有一个基于数组的解决方案,但它似乎很慢,所以我希望这样的东西可以加快速度。
现在我正在使用这个解决方案:
dfTemperature = spark.read.option("multiline", "true").json("path")
dfTemperatureCast = dfTemperature.withColumn("EventDate", to_timestamp(dfTemperature.EventDate)).withColumn("Value", dfTemperature.Value.cast('float'))
def AddVAluesToDf(row):
temperatures = dfTemperatureCast.filter((col("EventDate") >= row["start"]) & (col("EventDate") <= row["end"]))
maxTemp = temperatures.agg({"value": "max"}).head()["max(value)"]
minTemp = temperatures.agg({"value": "min"}).head()["min(value)"]
return (row.start, row.end, row.rms, row.state, maxTemp, minTemp)
pool = ThreadPool(10)
withValues = pool.map(AddVAluesToDf, rmsDf)
schema = StructType([
StructField('start', TimestampType(), True),
StructField('end', TimestampType(), True),
StructField('maxTemp', FloatType(), True),
StructField('minTemp', FloatType(), True)
])
cyclesDF = spark.createDataFrame(withValues, schema)