0

我正在尝试通过简单的 Spark 步骤执行来运行 EMR 集群,但遇到了无法解决的错误。该程序在我在 Eclipse 中本地运行时有效,但在 EMR 集群上运行时无效。该程序只是尝试将 S3 上的 CSV 文件转换为 Parquet 格式。

在 EMR 中运行时,出现以下错误:

原因:com.univocity.parsers.common.TextParsingException:解析输入的长度 (1000001) 超过了解析器设置中定义的最大字符数 (1000000)。 已解析内容中已识别的行分隔符。这可能是错误的原因。解析器设置中的行分隔符设置为“\n”。解析内容:

我没有任何超过 1000000 限制的字段。我尝试从 s3、s3n 和 s3a 位置读取数据。

    import org.apache.spark.SparkSession
    import org.apache.spark.sql.types._

    object TestEMR {
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate()
        val schema = StructType(
            Array(
              StructField("field1", StringType ,nullable = true),
              StructField("field2", IntegerType ,nullable = true),
              StructField("field3", IntegerType ,nullable = true),
              StructField("field4", TimestampType ,nullable = true),
              StructField("field5", TimestampType ,nullable = true),
              StructField("field6", StringType ,nullable = true),
              StructField("field7", StringType ,nullable = true),
              StructField("field8", StringType ,nullable = true),
              StructField("field9", StringType ,nullable = true),
              StructField("field10", StringType ,nullable = true),
              StructField("field11", StringType ,nullable = true),
              StructField("field12", StringType ,nullable = true),
              StructField("field13", StringType ,nullable = true),
              StructField("field14", StringType ,nullable = true),
              StructField("field15", StringType ,nullable = true),
              StructField("field16", StringType ,nullable = true),
              StructField("field17", StringType ,nullable = true),
              StructField("field18", StringType ,nullable = true),
              StructField("field19", StringType ,nullable = true),
              StructField("field20", StringType ,nullable = true)
            )
          )

        val df = spark.read
            .format("com.databricks.spark.csv")
            .schema(schema)
            .option("nullValue","")
            .option("treatEmptyValuesAsNulls","true")
            .load("s3://mybucket/input/myfile.csv")
       df.write.mode("append").parquet("s3://mybucket/output/myfile")
       spark.stop
      }
    }
4

2 回答 2

0

这个问题在spark-csv jira中仍然存在。如果您没有数据问题或读取为 RDD 然后创建数据框,他们提供了解决方法,例如使用开放的 csv 解析器。

val rdd = sc.textFile("file.csv")
// Here, filtering or transformation
//val filteredRDD = rdd.filter..
//val transformedRDD = rdd.map..

val df = new CsvParser().csvRdd(sqlContext, transformedRDD)
于 2017-01-27T12:38:57.153 回答
0

听起来它没有找到行尾,因此不断读取,直到达到单行 10K 字符的限制。

正如他们所说:检查该文件的换行符

于 2017-01-24T21:25:30.330 回答