1

我需要按某个列对一组 csv 行进行分组,并对每个组进行一些处理。

    JavaRDD<String> lines = sc.textFile
                        ("somefile.csv");
                JavaPairRDD<String, String> pairRDD = lines.mapToPair(new SomeParser());
                List<String> keys = pairRDD.keys().distinct().collect();
                for (String key : keys)
                {
                List<String> rows = pairRDD.lookup(key);

            noOfVisits = rows.size();
            country = COMMA.split(rows.get(0))[6];
            accessDuration = getAccessDuration(rows,timeFormat);
            Map<String,Integer> counts = getCounts(rows);
            whitepapers = counts.get("whitepapers");
            tutorials = counts.get("tutorials");
            workshops = counts.get("workshops");
            casestudies = counts.get("casestudies");
            productPages = counts.get("productpages");        
            }

    private static long dateParser(String dateString) throws ParseException {
        SimpleDateFormat format = new SimpleDateFormat("MMM dd yyyy HH:mma");
        Date date = format.parse(dateString);
        return date.getTime();
    }
dateParser is called for each row. Then min and max for the group is calculated to get the access duration. Others are string matches.

pairRDD.lookup 非常慢.. 有没有更好的方法来用 spark 做到这一点。

4

1 回答 1

3

我认为您可以简单地将该列用作键并执行groupByKey. 没有提及这些行的操作。如果它是以某种方式组合这些行的函数,您甚至可以使用reduceByKey.

就像是:

import org.apache.spark.SparkContext._  // implicit pair functions
val pairs = lines.map(parser _)
val grouped = pairs.groupByKey
// here grouped is of the form: (key, Iterator[String])

* 编辑 * 在查看过程之后,我认为将每一行映射到它所贡献的数据中,然后aggregateByKey将它们全部减少到一个总数会更有效。 aggregateByKey需要 2 个函数和一个零:

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)]

第一个函数是分区聚合器,它将有效地运行在本地分区中,为每个分区创建本地聚合部分。combineOperation 将采用这些部分聚合并将它们组合在一起以获得最终结果。

像这样的东西:

val lines = sc.textFile("somefile.csv")
// parse returns a key and a decomposed Record of values tracked:(key, Record("country", timestamp,"whitepaper",...)) 

val records = lines.map(parse(_))

val totals = records.aggregateByKey((0,Set[String].empty,Long.MaxValue, Long.MinValue, Map[String,Int].empty),
(record, (count, countrySet, minTime, maxTime, counterMap )) => (count+1,countrySet + record.country, math.min(minTime,record.timestamp), math.max(maxTime, record.timestamp), ...)
(cumm1, cumm2) => ???  // add each field of the cummulator
) 

这是 Spark 中进行基于键的聚合的最有效方法。

于 2014-10-28T15:30:40.820 回答