Hadoop在Hadoop上运行MapReduce命令实验jar:WordCount.jar运行代码:root/……/hadoop/bin/hadoop jar jar包名称使用的包名称input(输入地址) output(输出地址)生成测试文件:echo -e "aa\tbb \tcc\nbb\tcc\tdd" > ceshi.txt输入地址:/data2/u_lx_data/qiandongjun/eclipse/crjworkspace/input输出地址:/data2/u_lx_data/qiandongjun/eclipse/crjworkspace/output将测试文件转入输入文件夹:Hadoop fs -put ceshi.txt /data2/u_lx_data/qiandongjun/eclipse/crjworkspace/input/ceshi.txt运行如下代码:hadoop jar /data2/u_lx_data/qiandongjun/eclipse/crjworkspace/WordCount.jar WordCount /data2/u_lx_data/qiandongjun/eclipse/crjworkspace/input/ceshi.txt/data2/u_lx_data/qiandongjun/eclipse/crjworkspace/outputHadoop架构1、HDFS架构2、MapReduce架构HDFS架构(采用了Master/Slave 架构)1、Client --- 文件系统接口,给用户调用2、NameNode --- 管理HDFS的目录树和相关的的文件元数据信息以及监控DataNode的状态。
信息以“fsimage”及“editlog”两个文件形势存放3、DataNode --- 负责实际的数据存储,并将数据定期汇报给NameNode。
每个节点上都安装一个DataNode4、Secondary NameNode --- 定期合并fsimage和edits日志,并传输给NameNode(存储基本单位为block)MapReduce架构(采用了Master/Slave 架构)1、Client --- 提交MapReduce 程序并可查看作业运行状态2、JobTracker --- 资源监控和作业调度3、TaskTracker --- 向JobTracker汇报作业运行情况和资源使用情况(周期性),并同时接收命令执行操作4、Task --- (1)Map Task (2)Reduce Task ——均有TaskTracker启动MapReduce处理单位为split,是一个逻辑概念split的多少决定了Map Task的数目,每个split交由一个Map Task处理Hadoop MapReduce作业流程及生命周期一共5个步骤1、作业提交及初始化。
JobClient将作业相关上传到HDFS上,然后通过RPC通知JobTracker,JobTracker 接收到指令后通过调度模块对作业初始化。
JobInProgressTaskInProgress2、 任务调度与监控。
一出现空白资源,JobTracker 会选择一个合适的任务使用空白资源。
任务调度器(双层结构),首先选择作业再选择作业中的任务(重点考虑数据本地行)。
当TaskTracker 或Task 失败时,转移计算任务 当某Task 计算远落后于其他时,再给一个Task ,取计算较快的结果。
3、 任务运行环境准备。
JVM 的启动和资源隔离,均由TaskTracker 实现。
(每个Task 启动一个独立的JVM )4、 任务执行。
准备好环境后,TaskTracker 便会启动Task 。
JobTracker 5、 作业完成。
MapReduce 编程接口体系接口在应用程序层和MapReduce 执行器之间,可以分为两层。
1、 Java API --- (1)InputFormat (2)Mapper (3)Partitioner (4)Reducer (5)OutputFormat用户只需(2),(4),其余hadoop 自带 2、 工具层,提供了4个编程工具包(1) JobControl (2) ChainMapper/ChainReducer (3) Hadoop Streaming (4) Hadoop Pipes为非java 编程 为c/c++用户MapReduce API — 序列化序列化是指将结构化对象转为字节流以便于通过网络进行传输或写入持久存储的过程,主要作用:永久存储和进程间通信。
管理员自定义配置文件:由管理员设置,定义一些新的配置属性或者覆盖系统默认配置文件中的默认值。
Hadoop 会优先加载Common 的两个配置文件。
配置文件中有3个配置参数:name(属性名)、value(属性值) 和description(属性描述) 此外,Hadoop 为配置文件添加了两个新的特性:final 参数和变量扩展。
final 参数:如果管理员不想让用户程序修改❑ 某些属性的属性值,可将该属性的final 参数置为true 。
变量扩展:当读取配置文件时,如果某个属性存在对其他属性的引用,则 Hadoop 首跟踪作业运行状况 为每个Task ,跟踪每个任务的运行状态先会查找引用的属性是否为下列两种属性之一。
如果是,则进行扩展。
①其他已经定义的属性。
②Java 中System.getProperties() 函数可获取属性。
Java API——MapReduce 作业配置1、环境配置。
环境配置由Hadoop 自动添加。
主要:mapreddefault.xml及mapred-site.xml2、用户自定义配置。
用户自定义配置则由用户自己根据作业特点个性化定制而成。
InputFormatInputFormat 主要用于描述输入数据的格式,它提供以下两个功能。
①数据切分:按照某个策略将输入数据切分成若干个split,以便确定Map Task 个数以及对应的split。
——getSplits 方法②为Mapper 提供输入数据:给定某个split,能将其解析成一个个key/value 对。
getSplits 方法:它会尝试着将输入数据切分成numSplits个InputSplit。
InputSplit:支持序列化操作主要是为了进程间通信。
当数据传送给map时,map会将输入分片传送到InputFormat,InputFormat则调用方法getRecordReader()生成RecordReader,RecordReader再通过creatKey()、creatValue()方法创建可供map处理的<key,value>对。
简而言之,InputFormat()方法是用来生成可供map处理的<key,value>对的。
FileInputFormat该函数实现中最核心的两个算法是文件切分算法和host 选择算法。
(1)文件切分算法如果想让InputSplit尺寸大于block尺寸,则直接增大配置参数mapred.min.split.size即可。
新版API中,InputSplit划分算法不再考虑Map Task个数,而用MaxSize代替(mapred.max.split.size)(2)host 选择算法为此,FileInputFormat 设计了一个简单有效的启发式算法:首先按照rack 包含的数据量对rack 进行排序,然后在rack 内部按照每个node 包含的数据量对node 排序,最后取前N 个node 的host 作为InputSplit 的host 列表,这里的N 为block副本数。
当使用基于FileInputFormat 实现InputFormat 时,为了提高Map Task 的数据本地性,应尽量使InputSplit 大小与block 大小相同。
OutputFormat主要用于描述输出数据的格式,它能够将用户提供的key/value 对写入特定格式的文件中Mapper &Reducer以Mapper为例:新版API:参数封装到Context中(良好扩展性);不再继承JobConfigurable和Closeable,直接添加setup和cleanup进行初始化和清理工作;PartitionerMapReduce 提供了两个Partitioner 实现:HashPartitioner 和TotalOrderPartitioner。
基于TotalOrderPartitioner 全排序的效率跟key 分布规律和采样算法有直接关系;key 值分布越均匀且采样越具有代表性,则Reduce Task 负载越均衡,全排序效率越高。
TotalOrderPartitioner两个典型实例:TeraSort 和HBase 批量数据导入。
JobControl原理JobControl由两个类组成:Job和JobControlJob类:Job 类封装了一个MapReduce 作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态。
如果一个作业的依赖作业失败,则该作业也会失败,后续所有作业均会失败。
JobControl类:JobControl 封装了一系列MapReduce 作业及其对应的依赖关系。
同时,它还提供了一些API 用于挂起、恢复和暂停该线程。
ChainMapper/ChainReducer主要为了解决线性链式Mapper 而提出的,在Map或Reduce阶段存在多个Mapper,它产生的结果写到最终的HDFS 输出目录中对于任意一个MapReduce 作业,Map 和Reduce 阶段可以有无限个Mapper,但Reducer 只能有一个。
Hadoop MapReduce 有一个约定,函数OutputCollector.collect(key, value) 执行期间不应改变key 和value 的值。
ChainMapper/ChainReducer实现原理关键技术点:修改Mapper 和Reducer 的输出流,将本来要写入文件的输出结果重定向到另外一个Mapper 中。
当用户调用addMapper 添加Mapper 时,可能会为新添加的每个Mapper 指定一个特有的JobConf,为此,ChainMapper/ChainReducer 将这些JobConf 对象序列化后,统一保存到作业的JobConf 中。
Hadoop工作流引擎在Hadoop 之上出现了很多开源的工作流引擎,主要可概括为两类:隐式工作流引擎和显式工作流引擎。