0

我对 Hadoop 和 MapReduce 编程完全陌生,我正在尝试使用 Common Crawl 的数据编写我的第一个 MapReduce 程序。

我想从 AWS 读取 2015 年 4 月的所有数据。例如,如果我想在命令行中下载 2015 年 4 月的所有数据,我会这样做:

s3cmd 获取 s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-18/segments/1429246633512.41/wat/*.warc.wat.gz

此命令行工作,但我不想下载 2015 年 4 月的所有数据,我只想读取所有“warc.wat.gz”文件(以便分析数据)。

我尝试创建我的工作,看起来像这样:

public class FirstJob extends Configured implements Tool {
    private static final Logger LOG = Logger.getLogger(FirstJob.class);

    /**
     * Main entry point that uses the {@link ToolRunner} class to run the Hadoop
     * job.
     */
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new FirstJob(), args);
        System.out.println("done !!");
        System.exit(res);
    }

    /**
     * Builds and runs the Hadoop job.
     * 
     * @return 0 if the Hadoop job completes successfully and 1 otherwise.
     */
    public int run(String[] arg0) throws Exception {
        Configuration conf = getConf();
        //
        Job job = new Job(conf);
        job.setJarByClass(FirstJob.class);
        job.setNumReduceTasks(1);

        //String inputPath = "data/*.warc.wat.gz";
        String inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-18/segments/1429246633512.41/wat/*.warc.wat.gz";
        LOG.info("Input path: " + inputPath);
        FileInputFormat.addInputPath(job, new Path(inputPath));

        String outputPath = "/tmp/cc-firstjob/";
        FileSystem fs = FileSystem.newInstance(conf);
        if (fs.exists(new Path(outputPath))) {
            fs.delete(new Path(outputPath), true);
        }
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        job.setInputFormatClass(WARCFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setMapperClass(FirstJobUrlTypeMap.ServerMapper.class);
        job.setReducerClass(LongSumReducer.class);

        if (job.waitForCompletion(true)) {
            return 0;
        } else {
            return 1;
        }
    }

但我有这个错误:

线程“main”java.lang.IllegalArgumentException 中的异常:AWS 访问密钥 ID 和秘密访问密钥必须分别指定为 s3n URL 的用户名或密码,或者通过设置 fs.s3n.awsAccessKeyId 或 fs.s3n。 awsSecretAccessKey 属性(分别)。

我怎样才能解决我的问题?提前致谢,

4

1 回答 1

0

我解决了我的问题。在代码中,更改:

 Configuration conf = getConf();
 //
 Job job = new Job(conf);

Configuration conf = new Configuration();
conf.set("fs.s3n.awsAccessKeyId", "your_key");
conf.set("fs.s3n.awsSecretAccessKey", "your_key");
Job job = new Job(conf);
于 2015-07-10T10:35:50.097 回答