-1

我正在使用带有 Spark 1.5 的 Scala。

给定两个 DataFrameDataFrame1DataFrame2,我想在其中搜索DataFrame2键的值DataFrame1并使用结果创建DataFrame3。该功能是独一无二的,因为DataFrame1每行中有许多键,并且输出 DataFrame 应该以相同的顺序填充键和值,如下面的输出 DataFrame 所示。如果可能的话,我正在寻找一个分布式解决方案,因为这个功能应该在数百万条记录(约 1000 万条记录)上实现。任何有关如何进行的指导和有关有用方法的信息都有很大帮助。提前致谢!

输入:DataFrame1(contract_id 以及最多关联的 4 个客户)
contract_id, cust1_id, cust2_id, cust3_id, cust4_id
500001,100000001,100000002,100000003,100000004
500305,100000001,100000002,100000007
500303,100000021
500702,110000045
500304,100000021,100000051,120000051
503001,540000012,510000012,500000002,510000002
503051,880000045
输入:DataFrame2(客户主查询信息)
cust_id,date_of_birth
100000001,1988-11-04
100000002,1955-11-16
100000003,1980-04-14
100000004,1980-09-26
100000007,1942-03-07
100000021,1964-06-22
100000051,1920-03-12
120000051,1973-11-17
110000045,1955-11-16
880000045,1980-04-14
540000012,1980-09-26
510000012,1973-03-15
500000002,1958-08-18
510000002,1942-03-07

输出:DataFrame3

contract_id, cust1_id, cust2_id, cust3_id, cust4_id, cust1_dob, cust2_dob, cust3_dob, cust4_dob 
500001,100000001,100000002,100000003,100000004,1988-11-04,1955-11-16,1980-04-14,1980-09-26
500305,100000001,100000002,100000007,         ,1988-11-04,1955-11-16,1942-03-07
500303,100000021,         ,         ,         ,1964-06-22
500702,110000045          ,         ,         ,1955-11-16
500304,100000021,100000051,120000051,         ,1964-06-22,1920-03-12,1973-11-17
503001,540000012,510000012,500000002,510000002,1980-09-26,1973-03-15,1958-08-18,1942-03-07
503051,880000045          ,         ,         ,1980-04-14
4

1 回答 1

0

这可能不是最有效的解决方案,但这适用于您的情况。

  import spark.implicits._

  val df1 = spark.sparkContext
    .parallelize(
      Seq(
        ("500001", "100000001", "100000002", "100000003", "100000004"),
        ("500305", "100000001", "100000002", "100000007", ""),
        ("500303", "100000021", "", "", ""),
        ("500702", "110000045", "", "", ""),
        ("500304", "100000021", "100000051", "120000051", ""),
        ("503001", "540000012", "510000012", "500000002", "510000002"),
        ("503051", "880000045", "", "", "")
      ))
    .toDF("contract_id", "cust1_id", "cust2_id", "cust3_id", "cust4_id")

  val df2 = spark.sparkContext
    .parallelize(
      Seq(
        ("100000001", "1988-11-04"),
        ("100000002", "1955-11-16"),
        ("100000003", "1980-04-14"),
        ("100000004", "1980-09-26"),
        ("100000007", "1942-03-07"),
        ("100000021", "1964-06-22"),
        ("100000051", "1920-03-12"),
        ("120000051", "1973-11-17"),
        ("110000045", "1955-11-16"),
        ("880000045", "1980-04-14"),
        ("540000012", "1980-09-26"),
        ("510000012", "1973-03-15"),
        ("500000002", "1958-08-18"),
        ("510000002", "1942-03-07")
      ))
    .toDF("cust_id", "date_of_birth")

  val finalDF = df1
    .join(df2, df1("cust1_id") === df2("cust_id"), "left")
    .drop("cust_id")
    .withColumnRenamed("date_of_birth", " cust1_dob")
    .join(df2, df1("cust2_id") === df2("cust_id"), "left")
    .drop("cust_id")
    .withColumnRenamed("date_of_birth", " cust2_dob")
    .join(df2, df1("cust3_id") === df2("cust_id"), "left")
    .drop("cust_id")
    .withColumnRenamed("date_of_birth", " cust3_dob")
    .join(df2, df1("cust4_id") === df2("cust_id"), "left")
    .drop("cust_id")
    .withColumnRenamed("date_of_birth", " cust4_dob")

  finalDF.na.fill("").show() 
于 2017-05-18T05:19:46.803 回答