《Hadoop 2.X HDFS源碼剖析》讀書筆記(RPC)


1. HDFS概述

1.1 體系結構

HDFS體系結構

HDFS是一個主從(Master/Slave)體系結構的分布式系統。Namenode是HDFS的Master節點,負責管理文件系統的命名空間(namespace),以及數據塊到具體Datanode節點的映射等信息。集群中的Datanode一般是一個節點一個,負責管理它所在節點的存儲。

1.2 RPC接口

Hadoop RPC接口主要定義在org.apache.hadoop.hdfs.protocol包和org.apache.hadoop.hdfs.server.protocol包中,包括以下幾個接口:

  1. ClientProtocol:定義了客戶端與名字節點間的接口,客戶端對文件系統的所有操作都需要通過這個接口,同時客戶端讀、寫文件等操作也需要先通過這個接口與Namenode協商之后,再進行數據塊的讀出和寫入操作。
  2. ClientDatanodeProtocol:客戶端與數據節點間的接口。定義的方法主要是用於客戶端獲取數據節點信息時調用,而真正的數據讀寫交互則是通過流式接口進行。
  3. DatanodeProtocol:數據節點通過這個接口與名字節點通信,同時名字節點會通過這個接口中方法的返回值向數據節點下發指令。這是名字節點與數據節點通信的唯一方式。數據節點會通過這個接口向名字節點注冊、匯報數據塊的全量以及增量的存儲情況。同時,名字節點也會通過這個接口中方法的返回值,將名字節點指令帶回該數據塊,根據這些指令,數據節點會執行數據塊的復制、刪除以及恢復操作。
  4. InterDatanodeProtocol:數據節點與數據節點間的接口,數據節點會通過這個接口和其他數據節點通信。這個接口主要用於數據塊的恢復操作,以及同步數據節點存儲的數據塊副本的信息。
  5. NamenodeProtocol:第二名字節點與名字節點間的接口。
  6. 其他接口:主要包括安全相關接口(RefreshAuthorizationPolicyProtocol、RefreshUserMappingsProtocol)、HA相關接口(HAServiceProtocol)等。

1.3 流式接口

在HDFS中,流式接口包括了基於TCP的DataTransferProtocol接口,以及HA架構中Active Namenode和Standby Namenode之間的HTTP接口。

  1. DataTransferProtocol:用來描述寫入或者讀出Datanode上數據的基於TCP的流式接口,HDFS客戶端與數據節點以及數據節點與數據節點之間的數據塊傳輸就是基於DataTransferProtocol接口實現的。
  2. Active Namenode和Standby Namenode之間的HTTP接口:在HA中,Standby Namenode只需定期將自己的命名空間寫入一個新的fsimage文件,然后就會向Active Namenode的ImageServlet發送HTTP GET請求/getimage?putimage=1。這個請求的URL中包括了新的fsimage文件的事務ID,以及Standby Namenode用於下載的端口和IP地址。Active Namenode接收到該請求后,會發起HTTP GET請求以下載fsimage文件。

2. HDFS主要流程

2.1 客戶端讀流程

HDFS客戶端讀流程

  1. 打開HDFS文件:HDFS客戶端首先調用DistributedFileSystem.open()方法打開HDFS文件,底層調用ClientProtocol.open(),返回HdfsDataInputStream
    對象用於讀取數據塊。HdfsDataInputStream是DFSInputStream的裝飾類,真正進行數據塊讀取操作的是DFSInputStream對象。

  2. 從NameNode獲取Datanode地址:在DFSInputStream的構造方法中,會調用ClientProtocol.getBlockLocations()方法向名字節點獲取該HDFS文件起始位置數據塊的位置信息。NameNode返回的數據塊的存儲位置是按照與客戶端的距離遠近排序的,所以DFSInputStream可以選擇一個最優的Datanode節點,然后與這個節點建立數據連接讀取數據塊。

  3. 連接到Datanode讀取數據塊:HDFS客戶端通過調用DFSInputStream.read()方法從這個最優的Datanode讀取數據塊,數據會以數據包(packet)為單位從數據節點通過流式接口傳送到客戶端。當到達一個數據塊的末尾時,DFSInputStream就會再次調用ClientProtocol.getBlockLocations()獲取文件下一個數據塊的位置信息,並建立和這個新的數據塊的最優節點之間的連接,然后HDFS客戶端就可以繼續讀取數據塊了。

  4. 關閉輸入流:當客戶端成功完成文件讀取后,會通過HdfsDataInputStream.close()方法關閉輸入流。


    注:當數據塊損壞時,HDFS客戶端就會通過ClientProtocol.reportBadBlocks()向NameNode匯報這個損壞的數據塊副本。

2.2 客戶端寫流程

HDFS客戶端寫流程

  1. 創建文件:HDFS客戶端寫一個新的文件時,會首先調用DistributedFileSystem.create()方法在HDFS文件系統中創建一個新的空文件。底層調用ClientProtocol.create()方法通知Namenode執行對應的操作,Namenode會首先在文件系統目錄樹中的指定路勁下添加一個新的文件,然后將創建新文件的操作記錄到editlog中。完成ClientProtocol.create()調用后,DistributedFileSystem.create()方法就會返回一個HdfsDataOutputStream對象,底層包裝了一個DFSOutputStream對象,真正執行寫數據操作的其實是DFSOutputStream。

  2. 建立數據流管道:獲取到DFSOutputStream對象后,HDFS客戶端就可以調用DFSOutputStream.write()方法來寫數據了。由於DistributedFileSystem.create()方法只是在文件系統目錄樹中創建了一個空文件,並沒有申請任何數據塊,所以DFSOutputStream會首先調用ClientProtocol.addBlock()向Namenode申請一個新的空數據塊,addBlock()方法會返回一個LocatedBlock對象,這個對象保存了存儲這個數據塊的所有數據節點的位置信息。獲得了數據流管道中所有數據節點的信息后,DFSOutputStream就可以建立數據流管道寫數據塊了。

  3. 通過數據流管道寫入數據:成功地建立數據流管道后,HDFS客戶端就可以向數據流管道寫入數據了。寫入DFSOutputStream中的數據會被緩存在數據流中,之后這些數據會被切分成一個個數據包(packet)通過數據流管道發送到所有數據節點。通過數據流管道依次寫入數據節點的本地存儲。每個數據包都有一個確認包,確認包會逆序通過數據流管道回到輸出流。輸出流在確認了所有數據節點已經寫入這個數據包之后,就會從對應的緩存隊列刪除這個數據包。當客戶端寫滿一個數據塊之后,會調用addBlock()申請新的數據塊,然后循環執行上述操作。

  4. 關閉輸入流並提交文件:當HDFS客戶端完成了整個文件中所有數據塊的寫操作之后,就可以調用close()方法關閉輸出流,並調用ClientProtocol.complete()方法通知Namenode提交這個文件中的所有數據塊,也就完成了整個文件的寫入流程。


寫文件時,數據流管道中的數據節點出現故障,則輸出流會進行下面操作來進行故障恢復。

  1. 輸出流中緩存的沒有確認的數據包會重新加入發送隊列。但輸出流會通過調用ClientProtocol.updateBlockForPipeline()方法為數據塊申請一個新的時間戳,然后重新建立管道。這種機制保證了故障Datanode上的數據塊會過期,然后在故障恢復后,由於數據塊的時間戳與Namenode元數據中的不匹配而被刪除。

  2. 故障數據節點會從輸出流管道中刪除,然后輸出流會通過調用ClientProtocol.getAdditionalDatanode()方法通知Namenode分配新的數據節點到數據流管道中。由於新添加的數據節點上並沒有存儲這個新的數據塊,這時HDFS客戶端會通過DataTransferProtocol通知數據流管道中的一個Datanode復制這個數據塊到新的Datanode上。

  3. 數據流管道重新建立之后,輸出流會調用ClientProtocol.updatePipeline()更新Namenode中的元數據。

2.3 客戶端追加寫流程

  1. 打開已有的HDFS文件:客戶端調用DistributedFileSystem.append()方法打開一個已有的HDFS文件,append()方法首先會調用ClientProtocol.append()方法獲取文件最后一個數據塊的位置信息,如果文件的最后一個數據塊已經寫滿則返回null。然后append()方法會調用DFSOutputStream.newStreamForAppend()方法創建到這個數據塊的DFSOutputStream輸出流對象,獲取文件租約,並將新構建的DFSOutputStream方法包裝為HdfsDataOutputStream對象,最后返回。
  2. 建立數據流捅到:DFSOutputStream類的構造方法會判斷文件最后一個數據塊是否已經寫滿,如果沒有寫滿,則根據ClientProtocol.append()方法返回的該數據塊的位置信息建立到該數據塊的數據流管道;如果寫滿,則調用ClientProtocol.addBlock()向Namenode申請一個新的空數據塊之后建立數據流管道。
  3. 通過數據流管道寫入數據:成功建立數據流管道后,HDFS客戶端就可以向數據流管道寫入數據(這部分和寫HDFS文件流程類似)。
  4. 關閉輸入流並提交文件這部分和寫HDFS文件流程類似

2.4 Datanode啟動、心跳以及執行名字節點指令流程

這里寫圖片描述

  1. Datanode啟動時會首先通過DatanodeProtocol.versionRequest()獲取Namenode的版本號以及存儲信息等,然后Datanode會對NameNode的當前軟件版本號和Datanode的當前軟件版本號進行比較,確保它們是一致的。

  2. 成功地完成握手操作后,Datanode會通過DatanodeProtocol.register()方法向Namenode注冊。Namenode接收到注冊請求后,會判斷當前Datanode的配置是否屬於這個集群,它們之間的版本號是否一致。

  3. 注冊成功之后,Datanode就需要將本地存儲的所有數據塊以及緩存的數據塊上報到Namenode,Namenode會利用這些信息重新建立內存中數據塊與Datanode之間的對應關系。

2.5 HA切換流程

HA切換流程

3. RPC

3.1 概述

RPC框架結構圖

  1. 通信模塊:傳輸RPC請求和響應的網絡通信模塊,可以基於TCP協議,也可以基於UDP協議,可以是同步,也可以是異步的。
  2. 客戶端Stub程序:服務器和客戶端都包括Stub程序。在客戶端,Stub程序表現的就像本地程序一樣,但底層卻會將調用請求和參數序列化並通過通信模塊發送給服務器。之后Stub程序等待服務器的響應信息,將響應信息反序列化並返回給請求程序。
  3. 服務器端Stub程序:在服務器端,Stub程序會將遠程客戶端發送的調用請求和參數反序列化,根據調用信息觸發對應的服務程序,然后將服務程序返回的響應信息序列化並發回客戶端。
  4. 請求程序:請求程序會像調用本地方法一樣調用客戶端Stub程序,然后接收Stub程序返回的響應信息。
  5. 服務程序:服務器會接收來自Stub程序的調用請求,執行對應的邏輯並返回執行結果。

3.2 Hadoop RPC的使用

通信模塊

Hadoop實現了org.apache.hadoop.ipc.Client類以及org.apache.hadoop.ipc.Server類提供的基於TCP/IP Socket的網絡通信功能。

同時為了RPC機制更加健壯,Hadoop RPC允許客戶端配置使用不同的序列化框架(例如protobuf、avro)。

服務器端,為了提高性能,Server類采用了Java NIO提供的基於Reactor設計模式的事件驅動I/O模型,當Server完整地從網絡接收一個RPC請求后,會調用call()方法響應這個請求。

客戶端Stub程序

客戶端的Stub可以看作是一個代理對象,它會將請求程序的RPC調用序列化,並調用Client.call()方法將該請求發送給遠程服務器。

Hadoop定義了RpcEngine接口抽象使用不同序列化框架的RPC引擎,該接口包括兩個重要方法:

  1. getProxy():客戶端會調用RpcEngine.getProxy()方法獲取一個本地接口的代理對象,然后在這個代理對象上調用本地接口的方法(RpcInvocationHandler)。這個對象會將請求序列化,並調用Client.call()發送請求,同時RpcInvocationHandler會將響應信息反序列化並返回給調用程序。
  2. getServer():該方法用於產生一個RPC Server對象,服務器會啟動這個Server對象監聽客戶端發來的請求。成功從網絡接收請求數據后,Server對象會調用RpcInvoker(在RpcEngine的實現類中定義)對象處理這個請求。

服務器端Stub程序

服務器端Stub程序會將通信模塊接收的數據反序列化,然后調用服務程序對應的方法響應這個RPC請求。

3.3 Client發送請求與接收響應流程

Client發送請求與接收響應流程

  1. Client.call()方法將RPC請求封裝成一個Call對象,Call對象中保存了RPC調用的完成標志、返回值信息以及異常信息;隨后,Client.call()方法會創建一個Connection對象,Connection對象用於管理Client與Server的Socket連接。
  2. 用ConnectionId作為key,將新建的Connection對象放入Client.connections字段中保存(對於Connection對象,由於涉及了與Server建立Socket連接,會比較耗費資源,所以Client類使用一個HashTable對象connections保存那些沒有過期的Connection,如果可以復用,則復用這些Connection對象);以callId作為key,將構造的Call對象放入Connection.calls字段中保存。
  3. Client.call()方法調用Connection.setupIOstreams()方法建立與Server的Socket連接。setupIOstreams()方法還會啟動Connection線程,Connection線程會監聽Socket並讀取Server發回的響應信息。
  4. Client.call()方法調用Connection.sendRpcRequest()方法發送RPC請求到Server。
  5. Client.call()方法調用Call.wait()在Call對象上等待,等待Server發回響應信息。
  6. Connection線程收到Server發回的響應信息,根據響應信息中攜帶的信息找到對應的Call對象,然后設置Call對象的返回值字段,並調用call.notify()喚醒調用Client.call()方法的線程讀取Call對象的返回值。

3.4 Server接收請求與發送響應流程

Server接收請求與發送響應流程

Server類的設計是一個典型的多線程加多Reactor的網絡服務器結構。Server類處理RPC請求的流程如下:

  1. Listener線程acceptSelector在ServerSocketChannel上注冊OP_ACCEPT事件,並且創建readers線程池。每個Reader的readSelector此時並不監聽任何Channel。
  2. Client發送Socket連接請求,觸發Listener的acceptSelector喚醒Listener線程。
  3. Listener調用ServerSocketChanel.accept()創建一個新的SocketChannel。
  4. Listener從readers線程池中挑選一個線程,並在Reader的readSelector上注冊OP_READ事件。
  5. Client發送RPC請求數據包,觸發Reader的selector喚醒Reader線程。
  6. Reader從SocketChannel中讀取數據,封裝成Call對象,然后放入共享隊列CallQueue中。
  7. 最初,handlers線程池中的線程都在CallQueue(調用BlockingQueue.take())上阻塞,當有Call對象被放入后,其中一個Handler線程被喚醒,然后根據Call對象的信息調用BlockingService對象的callBlockingMethod()方法。隨后,Handler嘗試將響應寫入SocketChannel。
  8. 如果Handler發現無法將響應完全寫入SocketChannel時,將在Responder的respondSelector上注冊OP_WRITE事件。當Socket恢復正常時,Responder將被喚醒,繼續寫響應。當然,如果一個Call響應在一定時間內都無法被寫入,則會被Responder移除。

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
  © 2014-2022 ITdaan.com