1

当我尝试将IntWritable映射器中的一个传递给减速器时,出现以下错误:

INFO mapreduce.Job: Task Id : attempt_1413976354988_0009_r_000000_1, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.hbase.client.Mutation

这是我的映射器:

public class testMapper extends TableMapper<Object, Object>
{

    public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException, InterruptedException
    {

        try
        {
            // get rowKey and convert it to string
            String inKey = new String(rowKey.get());
            // set new key having only date
            String oKey = inKey.split("#")[0];
            // get sales column in byte format first and then convert it to
            // string (as it is stored as string from hbase shell)
            byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
            String sSales = new String(bSales);
            Integer sales = new Integer(sSales);
            // emit date and sales values
            context.write(new ImmutableBytesWritable(oKey.getBytes()), new IntWritable(sales));

        }

这是减速机:

public class testReducer extends TableReducer<Object, Object, Object>
{

    public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
    {
        try
        {

            int sum = 0;
            // loop through different sales vales and add it to sum
            for (IntWritable sales : values)
            {
                Integer intSales = new Integer(sales.toString());
                sum += intSales;
            }

            // create hbase put with rowkey as date

            Put insHBase = new Put(key.get());
            // insert sum value to hbase
            insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
            // write data to Hbase table
            context.write(null, insHBase);

和司机:

public class testDriver
{
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();

        // define scan and define column families to scan
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("cf1"));

        Job job = Job.getInstance(conf);
        job.setJarByClass(testDriver.class);

        // define input hbase table
        TableMapReduceUtil.initTableMapperJob("test1", scan, testMapper.class, ImmutableBytesWritable.class, IntWritable.class, job);
        // define output table
        TableMapReduceUtil.initTableReducerJob("test2", testReducer.class, job);

        job.waitForCompletion(true);
    }
}
4

1 回答 1

1

context.write(null, insHBase);

问题是您正在将 Put out 写入上下文,而 hbase 需要一个 IntWritable。

您应该将输出写入上下文并让 Hbase 负责存储它们。Hase 期望存储一个 IntWritable,但您正在给它一个扩展 Mutation 的 Put 操作。

Hbase 的工作流程是您将配置将输出放在配置中的位置,然后简单地将输出写入上下文。您不必在减速器中执行任何手动 Put 操作。

于 2014-10-22T18:02:09.803 回答