Spark資源調度與任務調度(standalone模式)


說明:本文所講的Spark資源調度與任務調度是standalone模式下的調度,其它模式下的調度(如Yarn、Mesos等)暫不涉及。
我們結合具體的應用案例——WordCount.scala 來詳細說明Spark是如何進行資源調度與任務調度的。WordCount.scala代碼:
package com.beijing.scala.spark.operator
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object WC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)

val linesRDD = sc.textFile("cs")
val wordsRDD = linesRDD.flatMap { _.split(" ") }
val pairRDD = wordsRDD.map { (_,1) }
val resultRDD = pairRDD.reduceByKey(_+_)
resultRDD.sortBy(_._2).foreach(println)
sc.stop()
}
}
這個案例是計算某一文本中各個單詞出現的次數,其具體的運算邏輯不用關注,我們關注的是在該應用提交到Spark集群的時候,Spark是如何進行資源和任務的調度的。
提交應用到Spark集群的命令:
./spark-submit --master spark://hadoop1:7077 --deploy-mode client --class com.beijing.scala.spark.operator.WC ../lib/WC.jar
命令說明:
     spark-submit  : 是SPARK_HOME/bin下的一個命令
     --master : master主機的地址
     --deploy-mode : 部署模式。分為client模式 與 cluster模式。兩種模式的區別在后文會有介紹。
     --class : 應用的入口。   此處是WC類中的main()方法。
     ../lib/WC.jar :  jar包的路徑

     在使用上述命令提交應用后,Spark會執行一系列的操作,來完成資源調度與任務調度的工作。下面我們跟隨代碼一步步的拆分,詳細的了解每一行代碼會執行什么操作。首先,我們通過圖示來了解一下Spark集群的情況:

在我的Spark集群中,一共有三台主機:hadoop1~hadoop3,其中hadoop1作為master 節點,hadoop2、hadoop3作為Worker節點,兩台Worker節點功能相同,為了圖示清晰,在圖示中省略了hadoop3。hadoop4是Spark集群的客戶端。
資源調度與任務調度的詳細流程:
1、Spark集群啟動時, 所有的Worker節點會想master節點上報自身的資源信息,這樣master節點就掌握了集群中每一個worker節點有多少資源可供使用。
2、在客戶端(hadoop4)通過spark-submit命令提交任務時,Spark會根據路徑找到Application的入口函數,並依次執行函數語句:
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
    在創建SparkContext對象的過程中,Spark會通過底層代碼創建兩個對象: DAGScheduler(負責切分job,划分stage) 和 TaskScheduler(負責task分發) 。 TaskScheduler對象創建完畢后,會向master為當前的Application申請資源。
3、master(hadoop1節點) 接收到來自TaskScheduler的請求后,會在Workers上啟動一批Executor(用於計算的進程)。默認情況下,每一個Worker會為當前的Application創建一個Executor進程。
4、master節點繼續向worker節點發送消息,啟動Executors。這時,每個Executor進程會創建一個ThreadPool,以供使用。
5、Executor啟動完成后,會反向注冊給TaskScheduler,這樣TaskScheduler對象就會持有一個可用的Executor的列表。
6、跟隨代碼繼續往下走,會遇到flatMap、map、reduceByKey、foreach,其中前三個算子是transformation類的算子,foreach是Action類算子。Transformation類的算子都是懶執行,Action類的算子都是立即執行。所以,直到執行到foreach時,才會觸發運算。這時,由DAGSchedulor類對job進行切割(每遇到一個Action類算子,就算一個job),划分stage。划分的依據是根據RDD的寬窄依賴划分的。
7、DAGScheduler 以TaskSet的形式,將task提交給TaskScheduler。
8、TaskScheduler會遍歷每一個task , 然后將task發送給Executor執行(由於TaskScheduler保留有Executor的列表,所以TaskScheduler知道task應該發送給哪些Executor)。在task發送的時候,spark會自動考慮到數據本地化的問題(即在數據所在的節點上執行task)。
9、遇到sc.stop(),釋放所有資源。


補充:
Spark 與 MapReduce 的區別(Spark為什么比MR運算速度快)?
1、Spark是基於內存迭代的,而MR是基於磁盤迭代的;
2、Spark的計算模式是pipeline模式, 即1+1+1=3
     MR的計算模式是: 1+1=2 , 2+1=3
3、Spark是可以進行資源復用的,而MR無法進行資源復用
     注:Spark的資源復用指的是:同一個Application中,不同的job之間可以進行資源復用,而Application之間是無法進行資源復用的。
4、Spark是粗粒度的資源調度
      MR是細粒度的資源調度
注:粗粒度與細粒度:
粗粒度(典型:spark):在Application執行之前,將所有的資源全部申請完畢。申請成功后,再進行任務的調度,當所有的任務執行完畢后,才會釋放這部分資源。
細粒度(典型:MR):在Application執行之前,不需要將資源全部申請好,執行進行任務調度。在每一個task執行之前,自己去申請資源,資源申請成功后,才會執行任務。當任務執行完成后,立即釋放這部分資源。
優缺點:
     Spark的優點即是MR的缺點,Spark的缺點是MR的優點。
                                      Spark                                                 MR
優點 在每一個task執行之前,不需要自己去申請資源,task啟動的時間短,相應的stage、job和application耗費的時間變短 一個task執行完畢后,會立即釋放這部分資源,集群的資源可以充分利用
缺點 在所有的task執行完畢后才會釋放資源,導致集群的資源無法充分利用 每一個task在執行之前,需要自己去申請資源,這樣就導致task啟動時間變長,進而導致stage、job、application的運行時間變長




Spark部署模式:client模式 與 cluster模式:
     以client模式部署時, Driver進程會在客戶端創建,因此可以在client上跟蹤到task的運行情況 以及 task的運行結果。
     以cluster模式部署時,Driver進程會在集群中的某一台Worker主機上創建,因此在客戶端上無法跟蹤到task的運行情況和運行結果。task的運行情況和運行結果可以通過web的方式加以查看(spark://hadoop1: 8080,選擇已完成的task,可查看詳情)。


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com