我正在尝试通过简单的 Spark 步骤执行来运行 EMR 集群,但遇到了无法解决的错误。该程序在我在 Eclipse 中本地运行时有效,但在 EMR 集群上运行时无效。该程序只是尝试将 S3 上的 CSV 文件转换为 Parquet 格式。
在 EMR 中运行时,出现以下错误:
原因:com.univocity.parsers.common.TextParsingException:解析输入的长度 (1000001) 超过了解析器设置中定义的最大字符数 (1000000)。 已解析内容中已识别的行分隔符。这可能是错误的原因。解析器设置中的行分隔符设置为“\n”。解析内容:
我没有任何超过 1000000 限制的字段。我尝试从 s3、s3n 和 s3a 位置读取数据。
import org.apache.spark.SparkSession
import org.apache.spark.sql.types._
object TestEMR {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate()
val schema = StructType(
Array(
StructField("field1", StringType ,nullable = true),
StructField("field2", IntegerType ,nullable = true),
StructField("field3", IntegerType ,nullable = true),
StructField("field4", TimestampType ,nullable = true),
StructField("field5", TimestampType ,nullable = true),
StructField("field6", StringType ,nullable = true),
StructField("field7", StringType ,nullable = true),
StructField("field8", StringType ,nullable = true),
StructField("field9", StringType ,nullable = true),
StructField("field10", StringType ,nullable = true),
StructField("field11", StringType ,nullable = true),
StructField("field12", StringType ,nullable = true),
StructField("field13", StringType ,nullable = true),
StructField("field14", StringType ,nullable = true),
StructField("field15", StringType ,nullable = true),
StructField("field16", StringType ,nullable = true),
StructField("field17", StringType ,nullable = true),
StructField("field18", StringType ,nullable = true),
StructField("field19", StringType ,nullable = true),
StructField("field20", StringType ,nullable = true)
)
)
val df = spark.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.load("s3://mybucket/input/myfile.csv")
df.write.mode("append").parquet("s3://mybucket/output/myfile")
spark.stop
}
}