我查看了 NLineInputFormat 的 getSplitsForFile() fn。我发现为输入文件创建了一个 InputStream,然后每 n 行创建它的迭代和拆分。它有效率吗?特别是在启动映射器任务之前在 1 个节点上发生此读取操作时。如果 1 有 5gb 的文件怎么办。基本上,这意味着文件数据被搜索两次,一次是在拆分创建期间,一次是在从映射器任务读取期间。如果这是一个瓶颈,hadoop 作业如何覆盖它?
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit> ();
Path fileName = status.getPath();
if (status.isDirectory()) {
throw new IOException("Not a file: " + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
<!-- my part of concern start -->
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
splits.add(createFileSplit(fileName, begin, length));
begin += length;
length = 0;
numLines = 0;
}
}
<!-- my part of concern end -->
if (numLines != 0) {
splits.add(createFileSplit(fileName, begin, length));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}
编辑以将我的用例提供给 clément-mathieu
我的数据集是大输入文件,每个大约 2gb。文件中的每一行代表一个需要插入到数据库表中的记录(在我的情况下是 cassandra)我想将我的数据库的批量事务限制为每 n 行。我已经使用 nlineinputformat 成功地做到了这一点。我唯一担心的是生产中是否存在隐藏的性能瓶颈。