当前位置:
文档之家› 并行计算模型MapReduce
并行计算模型MapReduce
conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class);
•
• •
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
Map
Map
„
Map
Reduce „ Reduce
结果 1
结果 R
• 数据分布存储,带来计算上的并行化:
横向扩展
• 为实现横向扩展, 需要把数据存储在HDFS中
• Hadoop 将 MapReduce 计算转移到各机器上
• 每台承担一部分数据.
MapReduce操作执行流程图
例1:一个单词统计的实例
适合非结构化和结构化的 海量数据的搜索、挖掘、 分析与机器智能学习等
MapReduce 计算模型
• 分布式计算的瓶颈是网络带宽 • “本地计算”是最有效的一种节约 网络带宽的手段
• “移动计算比移动数据更经济”。
MapReduce的特点
• 输入的键值是不固定的,由分析人员选择 • 对于非结构化和半结构化数据,非常有效 • 适合于大规模数据的一次写入,多次查询 • MapReduce其核心就是高速、流式读写操作
按照首字母将Map 中不同桶中的字符 串集合放置到相应 的Reduce中进行处 理。
Splits分片
• Hadoop 将 MapReduce 的输入数据划分成等 长的小数据块split.
• Split意味着处理每个分片所需的时间,将少于处 理整个输入数据所化时间.
• Hadoop 为每个分片建立一个map任务,并由 该任务来执行用户定义的map函数,从而处理 分片中的每条记录.
MapReduce job代码
•
• • • • •
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1]));
• 在 org.apache.hadoop.io 中有
• LongWritable(Java Long), Text (Java
String), IntWritable (Java Integer).
Reducer 参数
• Reducer<Text, IntWritable, Text, IntWritable> • reduce前 2个输入参数 (Text和 IntWritable ) 应该与map的输出 类型一致
25
问题分析
• 对气象数据中的年份和温度进行分析
• 找出
• • • • 最高气温 平均气温 … 待处理的气象数据如下:
Example. 气象数据的格式
0057 332130 99999 19500101 0300 +51317 +028783 00450 1 010000 1
-0128 1 -0139 1 10268 1
• values 是一个 Iterator, 遍历这个 Iterator, 就可以得到属于同一个 key 的 所有 value.
• 此处,key是一个单词,value 是词频。只需要 将所有的 value 相加,就可以得到这个单词的 总的出现次数。
WordCount-Reduce
Reduce Input Reduce(K, V[ ]) { Int count = 0; For each v in V count += v; Collect(K, count); }
Map阶段数据本地优化
• Hadoop 在存储有输入数据(HDFS中)的节点上运行map任务 可以获得最佳性能.
• 分片大小应该与HDFS块block的大小一致, 64 MB • 如果分片跨越两块, 同一分片中的两部分 数据将频繁的 通过网络传输到map任务节点,导致低效率
方法:使用MapReduce分析
• 根据MapReduce的设计思路,采用Map 和 Reduce两阶段:
• 以键值对(key-value pair,KVP)作为输入/输 出, 其类型由程序员选择.
• 为此要定义两个函数:
• map 和 reduce函数.
MapReduce 数据流
• 最底下是一个Unix pipeline模拟 MapReduce 流程
Java MapReduce
• 代码中的三个组成:
• map函数, • reduce函数, • 运行job的代码. See Example 最高气温的Mapper
Mapper 参数
• Mapper<LongWritable, Text, Text, IntWritable>
• • • • input key 是long integer offset input value 是 line of text, output key 是 year output value 是 temperature (integer).
MapReduce —并行计算模式及其应用
杨文川 2014.2
主要内容
• • • • 1) 2) 3) 4) MapReduce产生背景 MapReduce编程模型 MapReduce实现机制 MapReduce案例分析
MapReduce 一种处理海量数据的并行编程模式,用于大规模数据集 (通常大于1TB)的并行运算。 “Map(映射)”、“Reduce(化简)”的概念和主要 思想,都是从函数式编程语言和矢量编程语言借鉴
Reduce
<Hello,1> <World,2> <Bye,1>
Internal Grouping
<Bye 1, 1, 1>
<Hello,1> <Hadoop,2> <Bye,1>
Reduce Output
<Hadoop 2, 2> Reduce
<Bye,1> <Hadoop,2> <Hello,1>
2, “Hello Hadoop Bye Hadoop” Map
Map(K, V) { For each word w in V Collect(w, 1); }
3, “Bye Hadoop Hello Hadoop” Map
实现 Reduce 类
• 这个类实现 Reducer 接口中的 reduce 方 法, 输入参数中的 key, values 是由 Map 任务输出的中间结果,
数据本地化
• 数据本地化是MapReduce 的灵魂,也是其高 效的原因 • MapReduce尽量将数据靠近计算节点,数据本 地化导致数据获取可以快速高效
• 网络带宽是数据中心最稀缺的资源
• MapReduce采用合理的网络拓扑设置来预留网络带 宽. • 这些布局不会影响MapReduce快速的CPU 分析能力.
无共享(share nothing)
• 无共享结构让MapReduce程序员无需考虑系统的 部分失效问题, • 因为自身的系统实现,能够检测到失败的map或 reduce任务 • 并让正常的机器重新执行这些失败的任务, • 因为各个任务之间彼此独立, • 任务的执行顺序是无关紧要的
设计目标
• MapReduce 设计目标是服务于那些 数分钟或数小时可完成的任务 • 最适合于内部通过高速网络连接的 单一数据中心内 • 这些数据中心内的计算机最好由可 靠、定制的硬件组成.
MapReduce计算流程
原始数据 1 原始数据 2 原始数据 M
MapReduce运行 模型 Map函数 ——对一部分原始数据进
行指定的操作。每个Map操作都针 对不同的原始数据,因此Map与 Map之间是互相独立的,这使得它 们可以充分并行化 Reduce操作——对每个Map所产 生的一部分中间结果进行合并操作, 每个Reduce所处理的Map中间结 果是互不交叉的,所有Reduce产 生的最终结果经过简单连接就形成 了完整的结果集
Output:
Number of occurrences of each word MapReduce Bye 3 Hadoop 4 Hello 3 World 2
对原始的数据进行分割 (Split),得到N个不同 的数据分块
每一个数据分块都 启动一个Map进行 处理。 每个Map中按照首 字母将字符串分配 到26个不同的桶中
WordCount-Map
Input
1, “Hello World Bye World
<Hello,1> <World,1> <Bye,1> <World,1> <Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1> <Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>
<Hello 1, 1, 1>
Reduce
<Bye, 3> <Hadoop, 4> <Hello, 3> <World, 2>
<World 2>
Reduce
运行 Job
• 在 Hadoop 中一次计算任务称之为一 个 job, 可以通过一个 JobConf 对象 设置如何运行这个 job。