3

我有 2 个具有相同架构的数据帧,我需要比较数据帧的行并保留两个数据帧中至少有一列值为 1 的行数

现在我正在制作一个行列表,然后比较这两个列表以查找即使一个值在两个列表中都相等并且等于 1

rowOgList = []
for row in cat_og_df.rdd.toLocalIterator():
    rowOgDict = {}
    for cat in categories:
        rowOgDict[cat] = row[cat]
    rowOgList.append(rowOgDict)

#print(rowOgList[0])

rowPredList = []
for row in prob_df.rdd.toLocalIterator():
    rowPredDict = {}
    for cat in categories:
        rowPredDict[cat] = row[cat]
    rowPredList.append(rowPredDict)

但是在这里,当我在一个巨大的数据集上尝试它时,函数 rdd.tolocalIterator 给了我一个堆空间错误。例如:这是第一个数据帧

+-------+-------+-------+-------+
|column1|column2|column3|column4|
+-------+-------+-------+-------+
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      1|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      1|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      1|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      1|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      1|      0|      0|      0|
|      0|      0|      1|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
+-------+-------+-------+-------+

这是第二个数据框

+-------+-------+-------+-------+
|column1|column2|column3|column4|
+-------+-------+-------+-------+
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
+-------+-------+-------+-------+

这里第 9,11,17,18 行至少有一列具有相同的值,并且该值为 1,所以这里的计数 = 4

这可以以任何优化的方式完成,谢谢。

4

2 回答 2

2

注意:正如 pault 所提到的,如果您有连接两个数据帧的唯一行索引,这将更好地工作。否则,在某些 Spark 操作中可能无法保证行顺序。

(1)设置环境和一些样本数据。

import numpy as np

from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

df1 = spark.createDataFrame([
    (0, 0, 1),
    (1, 0, 0),
    (0, 0, 1)
], ["column1", "column2", "column3"])

df2 = spark.createDataFrame([
    (0, 0, 0),
    (1, 0, 1),
    (0, 0, 1)
], ["column1", "column2", "column3"])

(2)将所有列收集到一个 Spark 向量中。

assembler = VectorAssembler(
    inputCols=["column1", "column2", "column3"],
    outputCol="merged_col")

df1_merged = assembler.transform(df1)
df2_merged = assembler.transform(df2)
df1_merged.show()

+-------+-------+-------+-------------+
|column1|column2|column3|   merged_col|
+-------+-------+-------+-------------+
|      0|      0|      1|[0.0,0.0,1.0]|
|      1|      0|      0|[1.0,0.0,0.0]|
|      0|      0|      1|[0.0,0.0,1.0]|
+-------+-------+-------+-------------+

(3)获取非零元素的行列索引。在 Spark Vector 的 RDD 上使用 numpy.nonzero()。

def get_nonzero_index(args):
    (row, index) = args
    np_arr = np.array(row.merged_col)
    return (index, np_arr.nonzero()[0].tolist())

df1_ind_rdd = df1_merged.rdd.zipWithIndex().map(get_nonzero_index)
df2_ind_rdd = df2_merged.rdd.zipWithIndex().map(get_nonzero_index)
df1_ind_rdd.collect()
[(0, [2]), (1, [0]), (2, [2])]

df2_ind_rdd.collect()
[(0, []), (1, [0, 2]), (2, [2])]

(4)然后您可以轻松地对这两个 Python 列表进行比较。

collect请注意,如果您拥有的行数非常大,此方法将不会有效(由于)。在这种情况下,您将希望通过对 2 个数据帧进行连接来完成 Spark 中的所有处理。

(5)要纯粹在 Spark 中进行匹配,您可以尝试以下依赖于行索引连接的方法。

df1_index = spark.createDataFrame(df1_ind_rdd, ["row_index_1", "column_index_1"])
df2_index = spark.createDataFrame(df2_ind_rdd, ["row_index_2", "column_index_2"])

df_joined = df1_index.join(df2_index, df1_index.row_index_1 == df2_index.row_index_2)

然后展开列表,以便我们在每一行上获得一个元素。

df_exploded = df_joined.withColumn("column_index_exp_1", F.explode(df_joined.column_index_1))\
                            .withColumn("column_index_exp_2", F.explode(df_joined.column_index_2))

检查两列之间是否匹配,最后转换为整数进行求和。

df_match_bool = df_exploded.withColumn("match_bool", df_exploded.column_index_exp_1 == df_exploded.column_index_exp_2)

df_match_int = df_match_bool.withColumn("match_integer", df_match_bool.match_bool.cast("long"))
df_match_bool.show()
+-----------+--------------+-----------+--------------+------------------+------------------+----------+
|row_index_1|column_index_1|row_index_2|column_index_2|column_index_exp_1|column_index_exp_2|match_bool|
+-----------+--------------+-----------+--------------+------------------+------------------+----------+
|          1|           [0]|          1|        [0, 2]|                 0|                 0|      true|
|          1|           [0]|          1|        [0, 2]|                 0|                 2|     false|
|          2|           [2]|          2|           [2]|                 2|                 2|      true|
+-----------+--------------+-----------+--------------+------------------+------------------+----------+

df_match_int.groupBy().sum("match_integer").collect()[0][0]
2
于 2019-09-09T09:46:44.620 回答
1

对于Spark 2.4和少量列,并且在处理整个数组时具有一定程度的性能损失,但并行处理。例如,Num cols 为 5。动态模式列定义。整理此处需要的声明 这种方法适用于任何值,只需要限制为 1。添加了过滤器。某些方法似乎不适用于较低版本的 Spark。测试了这个。

from pyspark.sql.functions import udf, col, split, arrays_zip, expr, lit
from pyspark.sql import functions as F
from pyspark.sql.types import * 
from pyspark.sql import Row

df1 = spark.createDataFrame([
(1, 1, 0, 0, 0),
(1, 0, 0, 0, 1),
(0, 0, 0, 0, 0)      ], ["column1", "column2", "column3", "column4", "column5"])
df2 = spark.createDataFrame([
(1, 1, 1, 1, 1),
(0, 1, 1, 1, 1),
(0, 0, 0, 0, 0)      ], ["column1", "column2", "column3", "column4", "column5"])

schema1 = StructType(df1.schema.fields[:] + [StructField("index1", LongType(), True)])
schema2 = StructType(df2.schema.fields[:] + [StructField("index2", LongType(), True)])
allCols = [x for x in df1.columns] # at this stage common to both DFs - df1 & df2

rdd1 = df1.rdd.zipWithIndex()
rdd2 = df2.rdd.zipWithIndex()
# All narrow transformations, so zipWithIndex should be fine  

rddA = rdd1.map(lambda row: tuple(row[0].asDict()[c] for c in schema1.fieldNames()[:-1]) + (row[1],))
dfA = spark.createDataFrame(rddA, schema1)
rddB = rdd2.map(lambda row: tuple(row[0].asDict()[c] for c in schema2.fieldNames()[:-1]) + (row[1],))
dfB = spark.createDataFrame(rddB, schema2)

dfA = dfA.withColumn("merged_col1", F.concat_ws(',', *allCols))
dfB = dfB.withColumn("merged_col2", F.concat_ws(',', *allCols))
dfC = dfA.join(dfB, dfA.index1 == dfB.index2).select("index1", "merged_col1", "merged_col2") 
dfD = dfC.select(col("index1"), split(col("merged_col1"), ",\s*").cast("array<int>").alias("ev1"), split(col("merged_col2"), ",\s*").cast("array<int>").alias("ev2"))
dfE = dfD.withColumn("matches", expr("filter(sequence(0,size(ev1)-1), (i -> ev1[i] == 1 and ev1[i] == ev2[i]))"))    
dfF = dfE.withColumn("matchesSize", F.size(F.col("matches")))
dfF.filter(F.col("matchesSize") > 0).show()

在这个模拟案例中返回:

+------+---------------+---------------+-------+-----------+
|index1|            ev1|            ev2|matches|matchesSize|
+------+---------------+---------------+-------+-----------+
|     0|[1, 1, 0, 0, 0]|[1, 1, 1, 1, 1]| [0, 1]|          2|
|     1|[1, 0, 0, 0, 1]|[0, 1, 1, 1, 1]|    [4]|          1|
+------+---------------+---------------+-------+-----------+

您需要根据计数等以及要显示的内容进一步操作数据。有足够的数据来做到这一点。

无论如何,我都不是 pyspark 专家,而是一个有趣的问题。并且这没有爆炸等。比其他答案更简单,我觉得可能具有良好的并行性,可以进一步优化,但我们在分区方面将其留给您。不利的是实际上应该在第一次找到时停止处理的所有值。我怀疑需要一个UDF。

于 2019-09-10T16:04:11.180 回答