当前位置:文档之家› Apache_Spark源码走读系列篇二

Apache_Spark源码走读系列篇二

超人学院—Apache Spark源码走读之Task运行期之函数调用关系分析欢迎转载,转载请注明出处,超人学院。

概要本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。

准备1.spark已经安装完毕2.spark运行在local mode或local-cluster modelocal-cluster modelocal-cluster模式也称为伪分布式,可以使用如下指令运行MASTER=local[1,2,1024] bin/spark-shell[1,2,1024]分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512MDriver Programme的初始化过程分析初始化过程的涉及的主要源文件1.SparkContext.scala 整个初始化过程的入口2.SparkEnv.scala 创建BlockManager,MapOutputTrackerMaster, ConnectionManager, CacheManager3.DAGScheduler.scala 任务提交的入口,即将Job划分成各个stage的关键4.TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行5.SchedulerBackend1.最简单的单机运行模式的话,看LocalBackend.scala2.如果是集群模式,看源文件SparkDeploySchedulerBackend初始化过程步骤详解步骤1:根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManagerprivate[spark] val env = SparkEnv.create(conf,"",conf.get("spark.driver.host"),conf.get("spark.driver.port").toInt,isDriver = true,isLocal = isLocal)SparkEnv.set(env)步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键private[spark] var taskScheduler =SparkContext.createTaskScheduler(this, master, appName)taskScheduler.start()TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测overridedef start() {backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatchersc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {checkSpeculatableTasks()}}}步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行@volatileprivate[spark] var dagScheduler = new DAGScheduler(taskScheduler)dagScheduler.start()步骤4:启动WEB UIui.start()RDD的转换过程还是以最简单的wordcount为例说明rdd的转换过程sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果步骤1:val rawFile = sc.textFile("README.md")textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析scala> sc.textFile("README.md")14/04/2313:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes14/04/2313:11:48 INFO MemoryStore: ensureFreeSpace(119741)called with curMem=0, maxMem=31138775014/04/2313:11:48 INFO MemoryStore: Block broadcast_0 stored asvalues to memory (estimated size 116.9 KB, free 296.8 MB)14/04/2313:11:48 DEBUG BlockManager: Put block broadcast_0locally took 277 ms14/04/2313:11:48 DEBUG BlockManager: Put for block broadcast_0without replication took 281 msres0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] attextFile at :13步骤2: val splittedText = rawFile.flatMap(line =>line.split(" "))flatMap将原来的MappedRDD转换成为FlatMappedRDDdef flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]= new FlatMappedRDD(this, sc.clean(f))步骤3:val wordCount = splittedText.map(word => (word, 1))利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。

reduceByKey 的定义出现在源文件PairRDDFunctions.scala细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctionsimplicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =new PairRDDFunctions(rdd)这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。

接下来再看一看reduceByKey的定义def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {reduceByKey(defaultPartitioner(self), func)}def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {combineByKey[V]((v: V) => v, func, func, partitioner)}def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializerClass: String = null): RDD[(K, C)] = {if (getKeyClass().isArray) {if (mapSideCombine) {thrownew SparkException("Cannot use map-side combining with array keys.")}if (partitioner.isInstanceOf[HashPartitioner]) {thrownew SparkException("Default partitioner cannot partition array keys.")}}val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)if (self.partitioner == Some(partitioner)) {self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context,bineValuesByKey(iter, context))}, preservesPartitioning = true)} elseif (mapSideCombine) {val combined = self.mapPartitionsWithContext((context, iter) => {bineValuesByKey(iter, context)}, preservesPartitioning = true)val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializerClass)partitioned.mapPartitionsWithContext((context, iter) => {new InterruptibleIterator(context,bineCombinersByKey(iter, context))}, preservesPartitioning = true)} else {// Don't apply map-side combiner.val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context,bineValuesByKey(iter, context))}, preservesPartitioning = true)}}reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDDLog输出能证明我们的分析res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13RDD转换小结小结一下整个RDD转换过程HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunc tions->ShuffleRDD->MapPartitionsRDD整个转换过程好长啊,这一切的转换都发生在任务提交之前。

相关主题