MapReduce实例浅析(4)

发表于:2015-07-10来源:uml.org.cn作者:open经验库点击数: 标签:数据库
14/12/17 23:04:20 INFO mapred.MapTask: record buffer = 262144/327680 14/12/17 23:04:20 INFO mapred.MapTask: Starting flush of map output 14/12/17 23:04:20 INFO mapred.MapTask: Finished spill 0 14/12/1

  14/12/17 23:04:20 INFO mapred.MapTask: record buffer = 262144/327680

  14/12/17 23:04:20 INFO mapred.MapTask: Starting flush of map output

  14/12/17 23:04:20 INFO mapred.MapTask: Finished spill 0

  14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting

  14/12/17 23:04:20 INFO mapred.LocalJobRunner:

  14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0′ done.

  14/12/17 23:04:20 INFO mapred.LocalJobRunner:

  14/12/17 23:04:20 INFO mapred.Merger: Merging 2 sorted segments

  14/12/17 23:04:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 90 bytes

  14/12/17 23:04:20 INFO mapred.LocalJobRunner:

  14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

  14/12/17 23:04:20 INFO mapred.LocalJobRunner:

  14/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now

  14/12/17 23:04:20 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0′ to out

  14/12/17 23:04:20 INFO mapred.LocalJobRunner: reduce > reduce

  14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0′ done.

  14/12/17 23:04:20 INFO mapred.JobClient: map 100% reduce 100%

  14/12/17 23:04:20 INFO mapred.JobClient: Job complete: job_local_0001

  14/12/17 23:04:20 INFO mapred.JobClient: Counters: 14

  14/12/17 23:04:20 INFO mapred.JobClient: FileSystemCounters

  14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_READ=46040

  14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_READ=51471

  14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=52808

  14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=98132

  14/12/17 23:04:20 INFO mapred.JobClient: Map-Reduce Framework

  14/12/17 23:04:20 INFO mapred.JobClient: Reduce input groups=3

  14/12/17 23:04:20 INFO mapred.JobClient: Combine output records=0

  14/12/17 23:04:20 INFO mapred.JobClient: Map input records=4

  14/12/17 23:04:20 INFO mapred.JobClient: Reduce shuffle bytes=0

  14/12/17 23:04:20 INFO mapred.JobClient: Reduce output records=4

  14/12/17 23:04:20 INFO mapred.JobClient: Spilled Records=8

  14/12/17 23:04:20 INFO mapred.JobClient: Map output bytes=78

  14/12/17 23:04:20 INFO mapred.JobClient: Combine input records=0

  14/12/17 23:04:20 INFO mapred.JobClient: Map output records=4

  14/12/17 23:04:20 INFO mapred.JobClient: Reduce input records=4

  可见在默认情况下,MapReduce原封不动地将输入写到输出

  下面介绍MapReduce的部分参数及其默认设置:

  (1)InputFormat类

  该类的作用是将输入的数据分割成一个个的split,并将split进一步拆分成对作为map函数的输入

  (2)Mapper类

  实现map函数,根据输入的对生产中间结果

  (3)Combiner

  实现combine函数,合并中间结果中具有相同key值的键值对。

  (4)Partitioner类

  实现getPartition函数,用于在Shuffle过程按照key值将中间数据分成R份,每一份由一个Reduce负责

  (5)Reducer类

  实现reduce函数,将中间结果合并,得到最终的结果

  (6)OutputFormat类

  该类负责输出最终的结果

  上面的代码可以改写为:

public class LazyMapReduce {
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage:wordcount");
            System.exit(2);
        }
        Job job = new Job(conf, "LazyMapReduce");
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
         
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setReducerClass(Reducer.class);
         
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(FileOutputFormat.class);
         
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)? 0:1);
    }
}

原文转自:http://www.uml.org.cn/sjjm/201501201.asp