1.MapReduce概述
Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。
一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。
Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。
应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。
虽然Hadoop框架是用Java实现的,但Map/Reduce应用程序则不一定要用 Java来写 。
2.样例分析:单词计数
1、WordCount源码分析
单词计数是最简单也是最能体现MapReduce思想的程序之一,该程序完整的代码可以在Hadoop安装包的src/examples目录下找到
单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如图所示:
(1)Map过程
Map过程需要继承org.apache.hadoop.mapreduce包中的Mapper类,并重写map方法
通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中的value值存储的是文本文件中的一行(以回车符作为行结束标记),而key值为该行的首字符相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成一个个的单词,并将作为map方法的结果输出,其余的工作都交由MapReduce框架处理。其中IntWritable和Text类是Hadoop对int和string类的封装,这些类能够被串行化,以方便在分布式环境中进行数据交换。
TokenizerMapper的实现代码如下:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.out.println("key = " + key.toString());//添加查看key值 System.out.println("value = " + value.toString());//添加查看value值 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } |
(2)Reduce过程
Reduce过程需要继承org.apache.hadoop.mapreduce包中的Reducer类,并重写reduce方法
reduce方法的输入参数key为单个单词,而values是由各Mapper上对应单词的计数值所组成的列表,所以只要遍历values并求和,即可得到某个单词的出现总次数
IntSumReduce类的实现代码如下:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } |
原文转自:http://www.uml.org.cn/sjjm/201501201.asp