第四章分布式计算框架MapReduce4.1初识MapReduceMapReduce是一种面向大规模数据并行处理的编程模型,也一种并行分布式计算框架。
在Hadoop流行之前,分布式框架虽然也有,但是实现比较复杂,基本都是大公司的专利,小公司没有能力和人力来实现分布式系统的开发。
Hadoop的出现,使用MapReduce框架让分布式编程变得简单。
如名称所示,MapReduce主要由两个处理阶段:Map阶段和Reduce 阶段,每个阶段都以键值对作为输入和输出,键值对类型可由用户定义。
程序员只需要实现Map和Reduce两个函数,便可实现分布式计算,而其余的部分,如分布式实现、资源协调、内部通信等,都是由平台底层实现,无需开发者关心。
基于Hadoop开发项目相对简单,小公司也可以轻松的开发分布式处理软件。
4.1.1 MapReduce基本过程MapReduce是一种编程模型,用户在这个模型框架下编写自己的Map函数和Reduce函数来实现分布式数据处理。
MapReduce程序的执行过程主要就是调用Map函数和Reduce函数,Hadoop把MapReduce程序的执行过程分为Map和Reduce两个大的阶段,如果细分可以为Map、Shuffle(洗牌)、Reduce三个阶段。
Map含义是映射,将要操作的每个元素映射成一对键和值,Reduce含义是归约,将要操作的元素按键做合并计算,Shuffle在第三节详细介绍。
下面以一个比较简单的示例,形象直观介绍一下Map、Reduce阶段是如何执行的。
有一组图形,包含三角形、圆形、正方形三种形状图形,要计算每种形状图形的个数,见下图4-1。
图:4-1 map/reduce计算不同形状的过程在Map阶段,将每个图形映射成形状(键Key)和数量(值Value),每个形状图形的数量值是“1”;Shuffle阶段的Combine(合并),相同的形状做归类;在Reduce阶段,对相同形状的值做求和计算。
在用户编写程序时,需要编写Map和Reduce两个函数,而Combine是由集群完成的。
在Map函数中,将每一个形状作为参数传入函数,用户编写的程序将形状和数字“1”作为map映射。
对于第一个图形,这个map的Key就是“三角形”,value就是“1”,这样就形成了K/V对,同理对于后面所有的图形都做这样的映射。
映射完成后就形成了以每个形状Key 和数字“1”为Value的K/V键值对,这样Map阶段就工作完成了。
在Combine中,集群将所有Key相同的键值对进行组合,图中可以看出“三角形”后面有4个“1”,圆形后有6个“1”,正方形后有三个“1”,这些“1”正是map阶段键值对的Value。
在Reduce阶段,把相同的Key的Value进行求和就可以统计出每个形状的个数了。
这是单个Map/Reduce任务的执行过程,在集群中运行时系统会自动运行多个Map任务,以并行的方式处理多个输入的数据块,实现基于分布式的并行计算。
其实在我们现实生活中,很多数据要么本身就为键值对的形式,要么可以用键值对这种方式来表示,例如电话号码和通话记录,文件名和文件存储的数据等,键值对是存在于我们身边非常普通的模型。
4.1.2 MapReduce编程思想MapReduce的核心思想只有四个字:分而治之。
MapReduce采用"分而治之",这样就把对大规模数据集的复杂计算任务分解成“简单任务”,分发给集群中各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。
简单地说,MapReduce就是"任务的分解与结果的汇总"。
把复杂的任务分解为若干个“简单任务”来处理。
“简单任务”包含三层含义:计算量变小,数据或计算的规模相对原来任务要大大缩小;就近计算,即任务会被分配到存放所需数据的节点上进行计算,也称本地计算原则;独立运行,这些小任务可以并行计算,彼此间几乎没有依赖关系。
在Reduce阶段,负责对Map阶段的结果进行汇总。
至于需要多少个Reduce任务来汇总,则可以由用户根据需要指定。
”4.1.3 MapReduce的编程模型MapReduce借鉴了函数式程序设计语言Lisp中的思想,定义了的Map和Reduce两个抽象的编程接口,由用户去编程实现。
Map函数: (k1; v1)-->[(k2; v2)]。
输入:键值对(k1;v1)表示接收的数据;处理:文档数据记录(如文本文件中的一行),以键值对的形式传入map函数,处理完之后以另一种键值对的形式输出中间处理结果[k2;v2];输出:键值对[k2;v2]表示的一组中间数据;洗牌(Shuffle):对中间结果数据处理。
对Map输出的[k2;v2]数据,进行分组、合并、排序,传送到Reduce端。
Reduce函数: (k2; [v2])-->[(k3; v3)]。
输入: Map输出的一组键值对[(k2; v2)] 将被进行合并处理将同样主键下的不同数值合并到一个列表[v2]中,故Reduce的输入为(k2;[v2]);处理:对传入的中间结果列表数据进行某种整理或进一步的处理,并输出最终的某种键值对形式的结果[(k3;v3)];输出:键值对[k3;v3]表示的最终结果数据。
下面以处理文本为例,说明编程模型中各阶段执行步骤。
Map函数执行:读取输入文件内容,对输入文件的每一行,解析成Key、Value对,通常行数(行偏移量)为Key值,每行文本作为Value值。
每一个键值对调用一次Map函数。
写自己的逻辑,对输入的Key、Value处理,转换成新的Key、Value输出。
在Shuffle中对输出的Key、Value进行分区。
对不同分区的数据,按照Key进行排序、分组。
相同Key的Value放到一个集合中。
Reduce函数执行:对多个Map任务的输出,按照不同的分区,通过网络copy到不同的Reduce节点。
对多个Map任务的输出进行合并、排序。
写Reduce函数自己的逻辑,对输入的Key、Value处理,转换成新的Key、Value输出。
把Reduce的输出保存到文件中。
4.2 MapReduce编程4.2.1 程序入门最简单的MapReduce应用程序至少包含 3 个部分:一个 Map 函数、一个 Reduce 函数和一个 main 函数。
在运行一个mapreduce计算任务时候,任务过程被分为两个阶段:map 阶段和reduce阶段,每个阶段都是用键值对(key/value)作为输入(input)和输出(output)。
main 函数将作业控制和文件输入/输出结合起来。
下面我们用一个单词统计的例子来学习MapReduce程序执行过程。
•并行读取文本中的内容,然后进行MapReduce操作。
•Map过程:并行读取文本,对读取的单词进行map操作,每个词都以<key,value>形式生成。
一个有三行文本的文件进行MapReduce操作。
读取第一行Hello World Bye World ,分割单词形成Map。
<Hello,1> <World,1> <Bye,1> <World,1>读取第二行Hello Hadoop Bye Hadoop ,分割单词形成Map。
<Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1>读取第三行Bye Hadoop Hello Hadoop,分割单词形成Map。
<Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>•Reduce操作是对map的结果进行排序,合并,最后得出词频。
经过进一步处理(combiner),将形成的Map根据相同的key组合成value数组。
<Bye,1,1,1> <Hadoop,1,1,1,1> <Hello,1,1,1> <World,1,1>循环执行Reduce(K,V[]),分别统计每个单词出现的次数。
<Bye,3> <Hadoop,4> <Hello,3> <World,2>4.2.2第一个MapReduce程序根据上一小节介绍Word Count的MapReduce执行流程,这里我们来编写真正的第一个自己的MapReduce程序。
编程时只需要用户实现map和reduce两个方法,MapReduce执行原理中的其他步骤默认无需用户实现,而是由集群完成。
编写程序时先在本地开发环境中编写好代码测试通过后将程序打包上传到Hadoop集群中执行。
开发环境与HDSF章节中一致,需将相关Jar包下载到本地工程。
为了测试程序,在编写程序前将content.txt文件上传至HDFS的/mytest4/目录下,content.txt文件的内容为一段文本:the mission of the ASF is to provide software for the publicestablished in 1999 the ASF is a charitable organizationfunded by individual donations and corporate sponsorsASF is a good organization用户编写Map端程序和Reduce端程序,具体步骤如下:1、Map端编写步骤用户编写自己的Map类,需要继承org.apache.hadoop.mapreduce.Mapper类,并重写protected void map(KEYIN key, VALUEIN value, Context context)throws IOException, InterruptedException 方法。
下面是定义了一个类TokenizerMapper。
程序代码如下:代码4-1 TokenizerMapper.java1 import java.io.IOException;2 import java.util.StringTokenizer;3 import org.apache.hadoop.conf.Configuration;4 import org.apache.hadoop.fs.Path;5 import org.apache.hadoop.io.IntWritable;6 import org.apache.hadoop.io.Text;7 import org.apache.hadoop.mapreduce.Job;8 import org.apache.hadoop.mapreduce.Mapper;9 import org.apache.hadoop.mapreduce.Reducer;10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;1213 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {14 IntWritable one = new IntWritable(1);15 Text word = new Text();16 public void map(Object key, Text value, Context context) throws IOException, InterruptedException {17 // 将字符串按空格分开18 StringTokenizer itr = new StringTokenizer(value.toString());19 while (itr.hasMoreTokens()) {20 word.set(itr.nextToken());21 context.write(word, one);22 }23 }24 }代码中第1~11行,是导入相关的类,因为不同版本Hadoop中同名类较多,如果使用自动导入可能会将不同包下的同名类导入,初学者在导入时应多加注意。