SparkConf加載與SparkContext創建(源碼閱讀二)


  緊接着昨天,我們繼續開搞了啊。。

  1、下面,開始創建BroadcastManager,就是傳說中的廣播變量管理器。BroadcastManager用於將配置信息序列化后的RDDJob以及ShuffleDependency等信息在本地存儲。緊接着blockManager的創建后創建。如下:

  隨之我們繼續深入看這個broadcastManager是怎么創建與實現的。

  可以看到,在initialize()初始化方法調用以后,通過utils.classForName反射生成工廠實例broadcastFactory,可以配置屬性spark.broadcast.factory指定,默認為org.apache.spark.broadcast.TorrentBroadcastFactory。廣播變量與非廣播變量都是以broadcastFactory工廠實現的。

  2、接下來,開始創建緩存管理器CacheManager。CacheManager用於緩存RDD某個分區計算后的中間結果,緩存計算結果發生在迭代計算的時候,那么它是怎么實現的呢。我們繼續深入~

  我們可以看到,在創建cacheManager對象的時候,傳入了blockManager,真正的緩存對象,依舊是blockManager,cacheManager是為blockManager做了代理。當迭代計算中,如果判斷使用了緩存,就會調用getOrCompute,從blockManager.get(key)獲取存儲的block,如果存在,則封裝new InterruptibleIterator返回,否則將重新loading partition,從CheckPoint中獲取數據,調用putInBlockManager方法將數據寫入緩存,進行InterruptibleIterator封裝。

  再次深入調用putInBlockManager的過程。發現傳入了partition的key,computedValues,storageLevel存儲等級,由BlockId,BlockStatus組成的元素。隨之它里面又搞了些事情。

  

  如果存儲級別不允許使用內存,那么直接調用BlockManager的putIterator方法。通過判斷putLevel.useMemory,也就是存儲級別允許存儲,那么就進行展開,如果展開成功則將數據存入內存,否則則寫入磁盤。

 

  然后繼續,我們看下,開始創建metricsSystem,主要是用於加載metrics.properties文件中的屬性配置,當所有的基本組件准備好后,開始創建SparkEnv.

  

 

最后,創建MetadataCleaner,它的實質是一個用TimeTask實現的定時器,用於清理persistentRdds中的過期內容,最后的最后創建SparkUI.

好了~今天就到這里,明日繼續,我去敲代碼咯~~~~

  


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com