我有一个文件夹,其中包含具有不同模式的镶木地板文件,它们都有一个保证存在的公共字段。我想根据该字段过滤行并将其写回其他镶木地板文件。
spark中的类似动作将相当简单,看起来像
val filtered = rawsDF.filter(!col("id").isin(idsToDelete: _*))
问题是,如果我要扩展 ParquetInputFormat,我还必须提供可能不同的模式
ParquetInputFmt(path: Path, messageType: MessageType) extends ParquetInputFormat[User](path, messageType)
或使用这样的源函数:
class ParquetSourceFunction extends SourceFunction[String]{
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val inputPath = "s3a://foo/day=01/"
val conf = new Configuration()
conf.setBoolean("recursive.file.enumeration", true)
conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
val readFooter = ParquetFileReader.open(hadoopFile)
val metadata = readFooter.getFileMetaData
val schema = metadata.getSchema
val parquetFileReader = new ParquetFileReader(hadoopFile, ParquetReadOptions.builder().build())
parquetFileReader.getFilteredRecordCount
var pages: PageReadStore = null
try {
while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
val rows = pages.getRowCount
val columnIO = new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
val group: Group = recordReader.read()
val ind = group.getType.getFieldIndex("id")
val id = group.getInteger(ind, ind)
if (!listOfIds.contains(id))
ctx.collect(?) // how can I get the original row ?
}
}
}
}
我对后者的问题是我无法获取原始数据
有任何想法吗 ?