当前位置:文档之家› 大数据并行处理方法与举例

大数据并行处理方法与举例

大数据并行处理方法与举例1、介绍1.1 背景互联网行业中,日常运营中生成、累积的用户网络行为数据等大数据规模相当庞大,以至于不能用G或T来衡量。

大数据到底有多大?一组名为“互联网上一天”的数据告诉我们,一天之中,互联网产生的全部内容可以刻满1.68亿张DVD;发出的邮件有2940亿封之多(相当于美国两年的纸质信件数量);发出的社区帖子达200万个(相当于《时代》杂志770年的文字量);卖出的手机为37.8万台,高于全球每天出生的婴儿数量37.1万……而到了2020年,全世界所产生的数据规模将达到今天的44倍。

可以说,人类社会已经步入了大数据时代。

然而,大数据用现有的一般技术又难以处理,并且海量的非结构化数据带来的并不仅仅是存储、传输的问题,做好海量非结构化数据分析以及快速处理以更好的服务客户、提高业务效率已经成为紧迫的问题。

伴随着数据规模的爆炸式增长,数据并行分析处理技术也在不断进行着改进,以满足大数据处理对实时性的需求。

数据并行处理(Data Parallel Processing)是指计算机系统能够同时执行两个或更多个处理机的一种计算方法。

并行处理的主要目的是节省大型和复杂问题的解决时间。

为使用并行处理,首先需要对程序进行并行化处理,也就是说将工作各部分分配到不同处理机中。

当下比较流行的大数据分布式计算应用最具有代表性的有:MapReduce、Spark和GraphX。

下面详细介绍这三种应用的基本原理及应用例子。

1.2 MapReduce2006年由Apache基金会开发的Hadoop项目,由分布式文件系统HDFS和MapReduce 工作引擎所组成。

其中MapReduce采用“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。

简单地说,MapReduce就是“任务的分解与结果的汇总”。

在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker;另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的。

一个Hadoop集群中只有一台JobTracker。

在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。

MapReduce极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

MapReduce在企业中被非常广泛地利用,包括分布grep、分布排序、web连接图反转、每台机器的词矢量、web访问日志分析、反向索引构建、文档聚类、机器学习、基于统计的机器翻译等等。

在谷歌,超过一万个不同的项目已经采用MapReduce来实现,包括大规模的算法图形处理、文字处理、数据挖掘、机器学习、统计机器翻译以及众多其他领域。

1.3 SparkSpark是2009年由UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。

Spark项目现在被使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘宝等,豆瓣也在使用Spark的python克隆版Dpark。

1.4 GraphX鉴于业界对分布式图计算的需求日益见涨,Spark开始独立一个分支Graphx-Branch,作为独立的图计算模块,借鉴GraphLab,开始设计开发GraphX。

分布式图计算框架的目的,是将对于巨型图的各种操作包装为简单的接口,让分布式存储、并行计算等复杂问题对上层透明,从而使复杂网络和图算法的工程师,更加聚焦在图相关的模型设计和使用上,而不用关心底层的分布式细节。

为了实现该目的,需要解决两个通用问题:图存储模式和图计算模式。

1.5示例今天针对以上介绍的三种大数据并行处理应用介绍三个具体的例子。

Example 1: MapReduce - WordCount单频统计是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的"src/examples"目录下找到。

单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数。

图1是一个具体的例子。

图1 WordCount的MapReduce执行流程图1表示有两个txt文档作为输入,第一个文档里面保存了“Hello World”两个单词,第二个文档里面保存了“Hello Hadoop”两个单词,那么经过WordCount程序的处理,输出结果为每个单词出现的次数,即:Hello 2World 1Hadoop 1Example 2: Spark - Pi圆周率表示圆的周长与直径的比例,通常用π表示。

在Spark程序里面可以这么实现:假设在一个2×2的正方形里面掷骰子,那么骰子每次落在该正方形的内切圆的概率都是π/4,如图2所示。

这样只要统计出总共落在内切圆的次数,再除以总的掷骰子的次数,所得的值就近似等于π/4,把该值乘以4就得到π。

我们知道,掷的次数越多,所得到的圆周率就越准确。

在Spark里面可以假设有N个骰子往N个正方形里面同时掷,每个骰子都可以掷很多的次数,最后只需要把总的落在内切圆里面的次数相加,除以N个骰子掷出的总次数,最后再乘以4,就能得到比较精确的π值。

图2 在2×2的正方形里面掷骰子落在内切圆的概率为π/4使用Spark程序迭代10次,每次掷100000次骰子,算出来的圆周率为:π = 3.14044Example 3: GraphX - PageRankPageRank:网页排名,是一种根据网页之间相互的超链接计算的技术,而作为网页排名的要素之一,以Google公司创办人拉里·佩奇(Larry Page)之姓来命名。

Google用它来体现网页的相关性和重要性,在搜索引擎优化操作中是经常被用来评估网页优化的成效因素之一。

PageRank通过网络浩瀚的超链接关系来确定一个页面的等级。

Google把从A页面到B页面的链接解释为A页面给B页面投票,Google根据投票来源(甚至来源的来源,即链接到A页面的页面)和投票目标的等级来决定新的等级。

简单的说,一个高等级的页面可以使其他低等级页面的等级提升。

在互联网上,如果一个网页被很多其它网页所链接,说明它受到普遍的承认和依赖,那么它的排名就相对较高。

图3网页之间的关联关系假设现在有4个网页:URL 1 、 URL 2 、URL 3 、 URL 4 ,它们之间的指向关系如图3所示。

我们知道任何一个图都可以用矩阵来表示,比如图3就可以用以下矩阵来表示:[0 0 0 01 0 1 01 1 0 11 1 0 0] 由此,图的并行化处理问题在某种程度上就被转化为矩阵的并行化运算问题。

而对于PageRank 算法,有以下两个特点:1、网页和网页之间的关系用图来表示;2、网页URL i 和网页URL j 之间的连接关系表示任意一个用户从网页URL i 到转到网页URL j 的可能性(概率)。

我们设所有网页之间的连接关系用矩阵G 来表示,所有网页的排名用矩阵R 来表示,则有:R i =G ×R i−1。

初始假设,所有网页排名都是1/N ,N 表示网页的总数,即有R 0=(1N ,1N ,…,1N )−1,显然通过简单的矩阵运算可以得到R 1,R 2,…,可以证明 R i 最终会收敛,即R i 无限趋近于 R ,此时R =G ×R 。

因此,当两次迭代的结果 R i 和 R i−1 之间的差异非常小,接近于0的时候,迭代运算结束。

由于现实中网页之间链接的数量相比互联网的规模非常稀疏,因此计算网页的网页排名也需要对零概率或者小概率事件进行平滑处理,网页的排名是个一维向量,对它的平滑处理只能利用一个小的常数α,上述公式可以演变为:R i =[αN ×I +(1−α)×G]×R i−1(其中I 是单位矩阵)。

对于上述例子,为了规范化输入网页之间的连接信息,我们用“URL i URL j”表示网页URL i 指向网页URL j,则这4个网页可以用以下数对作为输入:URL2 URL1URL3 URL1URL4 URL1URL3 URL2URL4 URL2URL2 URL3URL3 URL4我们通过GraphX调用PageRank算法,分三个阶段计算:1)提取页面与页面的关系;2)用PageRank迭代计算每个页面的rank值;3)根据页面的rank进行降幂排序。

这个例子经过10次迭代得到的结果为(保留小数点后5位):URL1∶ 0.47287URL2∶ 0.33166URL3∶ 0.29121URL4∶ 0.232652、平台部署与搭建2.1部署介绍本次部署使用了3个节点,每个节点都是虚拟机(VM),每个虚拟机有20个VCPU,50GB 内存,以及1TB的硬盘。

当然实际上该部署方法不限于物理机或者是虚拟机,并且对于机器配置没有强制性的要求。

3个节点的角色即是1个master和2个slaves,master同时是Namenode、ResourceManager,slave又同时是Datanode、NodeManager。

安装过程需要遵守先安装hadoop,然后安装scala,再安装spark及GraphX的流程。

2.2硬件需求与准备工作2.2.1 OS要求Ubuntu 12.04 64bit2.2.2 ip地址分配master 219.243.*.170slave1 219.243.*.171slave2 219.243.*.1722.2.3设置root访问权限及修改hostname#sudo passwd rootyourpasswd#sudo vi /etc/hostnameMaster#sudo vi /etc/hosts219.243.*.170 master219.243.*.171 slave1219.243.*.172 slave22.2.4 VM网络配置#sudo vi /etc/network/interfacesauto eth0iface eth0 inet staticaddress 219.243.*.*netmask 255.255.255.192gateway 219.243.*.**dns-nameservers 202.38.120.*2.2.5扩展硬盘(大于200GB),设置开机挂载#vi /etc/fstab/dev/xvdb /spark ext4 defaults 1 02.2.6关闭防火墙#ufw disable2.2.7 让master可以无密码登陆slaves#apt-get install rsync#ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa#cat ~/.ssh/id_dsa.pub >>~/.ssh/authorized_keys把公钥复制到slaves相同的目录#scp ~/.ssh/authorized_keys slave1:~/.ssh/#scp ~/.ssh/authorized_keys slave2:~/.ssh/验证是否成功#ssh slave1#ssh slave22.3搭建详细步骤2.3.1 Java 安装与配置#apt-get install java-7-openjdk#vi /etc/profileexport JAVA_HOME=/usr/lib/jvm/java-7-oracleexport JRE_HOME=/usr/lib/jvm/java-7-oracle/jreexport PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATHexport CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar #source /etc/profile2.3.2安装hadoop 2.4.01)下载及解压#wget /apache/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz#tar zxvf Hadoop-2.4.0.tgz2)修改hadoop-env.sh和yarn-env.shexport JAVA_HOME= /usr/lib/jvm/java-7-oracle3)修改core-site.xml<configuration><property><name></name><value>hdfs://master:9000</value></property><property><name>hadoop.tmp.dir</name><value>/spark/tmp</value></property><property><name>hadoop.proxyuser.hduser.hosts</name><value>*</value></property><property><name>hadoop.proxyuser.hduser.groups</name><value>*</value></property></configuration>4)修改mapred-site.xml, 将mapred-site.xml.template重命名成mapred-site.xml <configuration><property><name>mapreduce.cluster.local.dir</name><value>/spark/local</value></property><property><name>mapreduce.cluster.system.dir</name><value>/spark/system</value></property><property><name></name><value>yarn</value></property><property><name>yarn.log-aggregation-enable</name><value>true</value></property><property><name>mapreduce.jobhistory.address</name><value>master:10020</value></property><property><name>mapreduce.jobhistory.webapp.address</name><value>master:19888</value></configuration>【说明】相比于Hadoop1.0,用户无需再配置mapred.job.tracker,这是因为JobTracker已变成客户端的一个库,他可能被随机调度到任何一个slave上,也就是它的位置是动态生成的。

相关主题