1

我查看了 NLineInputFormat 的 getSplitsForFile() fn。我发现为输入文件创建了一个 InputStream,然后每 n 行创建它的迭代和拆分。它有效率吗?特别是在启动映射器任务之前在 1 个节点上发生此读取操作时。如果 1 有 5gb 的文件怎么办。基本上,这意味着文件数据被搜索两次,一次是在拆分创建期间,一次是在从映射器任务读取期间。如果这是一个瓶颈,hadoop 作业如何覆盖它?

 public static List<FileSplit> getSplitsForFile(FileStatus status,
          Configuration conf, int numLinesPerSplit) throws IOException {
        List<FileSplit> splits = new ArrayList<FileSplit> ();
        Path fileName = status.getPath();
        if (status.isDirectory()) {
          throw new IOException("Not a file: " + fileName);
        }
        FileSystem  fs = fileName.getFileSystem(conf);
        LineReader lr = null;
        try {
          FSDataInputStream in  = fs.open(fileName);
          lr = new LineReader(in, conf);
          Text line = new Text();
          int numLines = 0;
          long begin = 0;
          long length = 0;
          int num = -1;
<!-- my part of concern start -->
          while ((num = lr.readLine(line)) > 0) {
            numLines++;
            length += num;
            if (numLines == numLinesPerSplit) {
              splits.add(createFileSplit(fileName, begin, length));
              begin += length;
              length = 0;
              numLines = 0;
            }
          }
<!-- my part of concern end -->
          if (numLines != 0) {
            splits.add(createFileSplit(fileName, begin, length));
          }
        } finally {
          if (lr != null) {
            lr.close();
          }
        }
        return splits; 
      }

编辑以将我的用例提供给 clément-mathieu

我的数据集是大输入文件,每个大约 2gb。文件中的每一行代表一个需要插入到数据库表中的记录(在我的情况下是 cassandra)我想将我的数据库的批量事务限制为每 n 行。我已经使用 nlineinputformat 成功地做到了这一点。我唯一担心的是生产中是否存在隐藏的性能瓶颈。

4

1 回答 1

2

基本上,这意味着文件数据被搜索两次,一次是在拆分创建期间,一次是在从映射器任务读取期间。

是的。

这样做的目的InputFormat是为每 N 行创建一个拆分。计算分割边界的唯一方法是读取这个文件并找到换行符。此操作可能成本高昂,但如果这是您需要的,则无法避免。

如果这是一个瓶颈,hadoop 作业如何覆盖它?

不确定是否理解问题。

NLineInputFormat 不是默认的 InputFormat,很少有用例需要它。如果您阅读该类的 javadoc,您会发现该类的存在主要是为了将参数提供给令人尴尬的并行作业(=“小”输入文件)。

大多数 InputFormat 不需要读取文件来计算拆分。他们通常使用硬性规则,例如拆分应该是 128MB每个 HDFS 块一个拆分,并且 RecordReader 将处理真正的拆分开始/结束偏移量。

如果成本NLineInputFormat.getSplitsForFile是一个问题,我真的会回顾为什么我需要使用它InputFormat。您要做的是限制映射器中业务流程的批量大小。为每 N 行创建一个映射器,NLineInputFormat这意味着一个映射器永远不会执行超过一个批量事务。您似乎不需要此功能,您只想限制批量事务的大小,但不关心映射器是否按顺序执行其中几个。所以你付出了你发现的代码的成本,没有任何回报。

我会TextInputFormat在映射器中使用并创建批处理。在伪代码中:

setup() {
  buffer = new Buffer<String>(1_000_000);
}

map(LongWritable key, Text value) {
  buffer.append(value.toString())
  if (buffer.isFull()) {
    new Transaction(buffer).doIt()
    buffer.clear()
  }
}

cleanup() {
  new Transaction(buffer).doIt()
  buffer.clear()
}

默认情况下,每个 HDFS 块都会创建一个映射器。如果您认为这太多或太少,mapred.(max|min).split.size变量允许增加或减少并行度。

基本上,虽然方便NLineInputFormat对于您的需要来说太细了。TextInputFormat您可以使用和播放 几乎相同的东西*.split.size,而不涉及读取文件来创建拆分。

于 2014-08-16T18:24:06.467 回答