sprak 容錯機制-checkpoint


   我們知道spark具有很強的數據容錯機制,為了保證RDD的完整性,RDD 通過血統(Lineage)的關系,它采用粗粒度的方式記錄了RDD的演變過程,這種方式相比於細粒度的方式確實限制了spark的運用場景,但是它卻提高了spark的性能。

當RDD在運行的過程中,出現錯誤導致數據不完整,這時spark會根據血統的關系,重新從頭計算RDD的方式來恢復數據,這樣在RDD的迭代次數比較少時,性能不會有太大差別,但是通常在使用spark執行機器學習算法時,往往需要迭代上百次,假如一個機器學習算法需要迭代RDD100次,但是在執行第100次時,spark出現故障,為保證數據的完整性,spark需要從頭開始重新計算RDD,這樣會導致spark的性能下降,為了應對這種情況,spark提供了另外一個機制-Checkpoint。

checkPoint可以將中間執行的RDD緩存到磁盤,當后面的RDD在執行時出現問題,spark運行機制就不必從頭重新計算RDD,只需在checkPoint點獲取數據重新計算后面的RDD即可,這樣對於迭代次數比較多的spark任務,可以很好的提高其運行性能。下面看一下checkPoint的spark源碼。

  /**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
在源碼的注釋中可以看出,checkpoint的RDD將會保存到通過SparkContext設置的CheckPoint的目錄下面,並且 會移除checkpoint的RDD之前所有的RDD, 還有就是checkpoint方法要在RDD執行action方法之前調用。 注釋的后半句也是相當重要, 強烈建議RDD持久化到內存中在進行checkpoint操作,不然在checkpoint操作時,將會重新計算RDD ,這樣會很影響性能。

溫馨提示:在進行checkpoint操作時 ,請先設置checkpoint保存的目錄

具體設置方式如:sc.setCheckpointDir("hdfs://data/checkpoint20180122")

否則將會如源碼所寫拋出checkpoint目錄在SparkContext中沒有設置異常。

 throw new SparkException("Checkpoint directory has not been set in the SparkContext")




注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com