我有一组文件说 10 个文件和一个大文件,它是所有 10 个文件的总和。
我将它们添加到分布式缓存、作业配置中。
当我在 reduce 中阅读它们时,我观察到以下几点:
我只读取了在 reduce 方法中添加到分布式缓存中的选定文件。我预计速度会更快,因为与在所有 reduce 方法中读取大文件相比,每个 reduce 中读取的文件大小更小。但是,它变慢了。
此外,当我将其拆分为更小的文件并将它们添加到分布式缓存时,问题变得更糟。工作本身在很长一段时间后才开始运行。
我找不到原因。请帮忙。
我有一组文件说 10 个文件和一个大文件,它是所有 10 个文件的总和。
我将它们添加到分布式缓存、作业配置中。
当我在 reduce 中阅读它们时,我观察到以下几点:
我只读取了在 reduce 方法中添加到分布式缓存中的选定文件。我预计速度会更快,因为与在所有 reduce 方法中读取大文件相比,每个 reduce 中读取的文件大小更小。但是,它变慢了。
此外,当我将其拆分为更小的文件并将它们添加到分布式缓存时,问题变得更糟。工作本身在很长一段时间后才开始运行。
我找不到原因。请帮忙。
我认为您的问题在于在 reduce() 中读取文件。您应该阅读 configure()(使用旧 API)或 setup()(使用新 API)中的文件。因此,对于每个 reducer,它只会被读取一次,而不是为 reducer 的每个输入组读取它(基本上,每次调用 reduce 方法)
您可以编写如下内容:使用新的 mapreduce API (org.apache.hadoop.mapreduce.*) -
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {
...
Path file1;
Path file2;
...
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// Get the file from distributed cached
file1 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];
file2 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[1];
// parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap.
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
...
}
}
使用旧的 mapred API (org.apache.hadoop.mapred.*) -
public static class ReduceJob extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
...
Path file1;
Path file2;
...
@Override
public void configure(JobConf job) {
// Get the file from distributed cached
file1 = DistributedCache.getLocalCacheFiles(job)[0]
file2 = DistributedCache.getLocalCacheFiles(job)[1]
...
// parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap.
}
@Override
public synchronized void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
...
}
}