Storm高級原語(二) — DRPC


轉自:http://www.aboutyun.com/thread-8708-1-1.html

問題導讀
1、什么是Distributed RPC?
2、函數與函數之間靠什么來區分?
3、LinearDRPCTopologyBuilder的工作原理是什么?

Storm里面引入DRPC主要是利用storm的實時計算能力來並行化CPU密集型(CPU intensive)的計算任務。DRPC的storm topology以函數的參數流作為輸入,而把這些函數調用的返回值作為topology的輸出流。

DRPC其實不能算是storm本身的一個特性, 它是通過組合storm的原語stream、spout、bolt、 topology而成的一種模式(pattern)。本來應該把DRPC單獨打成一個包的, 但是DRPC實在是太有用了,所以我們我們把它和storm捆綁在一起。

概覽
Distributed RPC是由一個”DPRC服務器”協調(storm自帶了一個實現)。DRPC服務器協調:① 接收一個RPC請求 ② 發送請求到storm topology ③ 從storm topology接收結果 ④ 把結果發回給等待的客戶端。從客戶端的角度來看一個DRPC調用跟一個普通的RPC調用沒有任何區別。比如下面是客戶端如何調用RPC計算“reach”功能(function)的結果,reach方法的參數是: http://twitter.com

  1. DRPCClient client = new DRPCClient("drpc-host", 3772);
  2. String result = client.execute("reach", "http://twitter.com");

復制代碼


DRPC的工作流大致是這樣的(重要☆):



客戶端 給DRPC服務器發送要執行的 函數 (function)的名字,以及這個函數的參數。實現了這個函數的topology使用DRPCSpout從DRPC服務器接收函數調用流,每個函數調用被DRPC服務器標記了一個唯一的id。 這個topology然后計算結果,在topology的最后,一個叫做ReturnResults的bolt會連接到DRPC服務器,並且把這個調用的結果發送給DRPC服務器(通過那個唯一的id標識)。DRPC服務器用那個唯一id來跟等待的客戶端匹配上,喚醒這個客戶端並且把結果發送給它。

LinearDRPCTopologyBuilder
Storm自帶了一個稱作 LinearDRPCTopologyBuilder 的topology builder,它把實現DRPC的幾乎所有步驟都自動化了。這些步驟包括:
1、設置spout
2、把結果返回給DRPC 服務器
3、給bolt提供有限聚合幾組tuples的能力

來看一個簡單的例子,下面是一個把輸入參數后面添加一個”!”的DRPC topology的實現:
  1. public static class ExclaimBolt extends BaseBasicBolt {
  2.     public void execute(Tuple tuple, BasicOutputCollector collector) {
  3.         String input = tuple.getString(1);
  4.         collector.emit(new Values(tuple.getValue(0), input + "!"));
  5.     }
  6.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  7.         declarer.declare(new Fields("id", "result"));
  8.     }
  9. }

  10. public static void main(String[] args) throws Exception {
  11.     LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
  12.     builder.addBolt(new ExclaimBolt(), 3);
  13.     // ...
  14. }
復制代碼


可以看出來, 我們需要做的事情非常的少。創建LinearDRPCTopologyBuilder的時候,你需要告訴它你要實現的DRPC 函數 (DRPC function)的名字。一個DRPC服務器可以協調很多函數,函數與函數之間靠函數名字來區分。你聲明的第一個bolt會接收一個兩維tuple,tuple的第一個字段是request-id,第二個字段是這個請求的參數。LinearDRPCTopologyBuilder同時要求我們topology的最后一個bolt發送一個形如[id, result]的二維tuple:第一個field是request-id,第二個field是這個函數的結果。最后所有中間tuple的第一個field必須是request-id。

在這里例子里面ExclaimBolt 簡單地在輸入tuple的第二個field后面再添加一個”!”,其余的事情都由LinearDRPCTopologyBuilder幫我們搞定:連接到DRPC 服務器 ,並且把結果發回。

本地模式DRPC
DRPC可以以本地模式運行,下面就是以本地模式運行上面例子的代碼:
  1. LocalDRPC drpc = new LocalDRPC();
  2. LocalCluster cluster = new LocalCluster();

  3. cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

  4. System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));

  5. cluster.shutdown();
  6. drpc.shutdown();
復制代碼


首先你創建一個LocalDRPC對象,這個對象在進程內模擬一個DRPC服務器(這很類似於LocalCluster在進程內模擬一個Storm集群),然后創建LocalCluster對象在本地模式運行topology。LinearTopologyBuilder有單獨的方法來創建本地的topology和遠程的topology。在本地模式里面LocalDRPC對象不和任何端口綁定,所以我們的topology對象需要知道和誰交互,這就是為什么createLocalTopology方法接受一個LocalDRPC對象作為輸入的原因。

把topology啟動了之后,你就可以通過調用LocalDRPC對象的execute來調用RPC方法了。

遠程模式DRPC
在一個真實集群上面DRPC也是非常簡單的,有三個步驟:
1、啟動DRPC服務器
2、配置DRPC服務器的地址
3、提交DRPC topology到storm集群里面去。

我們可以通過“bin/storm drpc”命令來啟動DRPC服務器。
接着, 你需要讓你的storm集群知道你的DRPC 服務器 的地址。DRPCSpout需要這個地址從而可以從DRPC服務器來接收 函數 調用。這個可以配置在storm.yaml或者通過代碼的方式配置在topology里面。通過storm.yaml配置是這樣的:
  1. drpc.servers:
  2.   - "drpc1.foo.com"
  3.   - "drpc2.foo.com"
復制代碼


最后,你通過StormSubmitter對象來提交DRPC topology(這個跟你提交其它topology沒有區別)。如果要以遠程的方式運行上面的例子,用下面的代碼:
  1. StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
復制代碼

我們用createRemoteTopology方法來創建運行在真實集群上的DRPC topology。

一個更復雜的例子
上面的DRPC例子只是為了介紹DRPC概念的一個簡單的例子。下面讓我們看一個復雜的、確實需要storm的並行計算能力的例子, 這個例子計算twitter上面一個url的 reach 值。

一個URL的reach值是該URL對應的推文能到達(reach)的用戶數量,要計算一個URL的reach值,我們需要:
1、獲取所有推文里面包含這個URL的人(轉發過該URL的人)
2、獲取這些人的粉絲
3、把這些粉絲去重
4、獲取這些去重之后的粉絲個數 — 這就是reach值

一個簡單的reach計算可能會有成千上萬個數據庫調用,並且可能涉及到千萬數量級的粉絲用戶。這個確實可以說是CPU intensive的計算了,但你會看到,在storm上面來實現這個是非常非常的簡單。在單台機器上面,一個reach計算可能需要花費幾分鍾。而在一個storm集群里面,即時是最難的URL, 也只需要幾秒。

一個reach topolgoy的例子可以在這兒找到(storm-starter), reach  topology是這樣定義的:
  1. LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
  2. builder.addBolt(new GetTweeters(), 3);
  3. builder.addBolt(new GetFollowers(), 12)
  4.         .shuffleGrouping();
  5. builder.addBolt(new PartialUniquer(), 6)
  6.         .fieldsGrouping(new Fields("id", "follower"));
  7. builder.addBolt(new CountAggregator(), 2)
  8.         .fieldsGrouping(new Fields("id"));
復制代碼


這個topology分四步執行:
1、GetTweeters獲取轉發該推文的所有用戶。它接收輸入流: [id, url],它輸出:[id, tweeter]. 每個URL tuple會對應到很多tweeter tuple。
2、GetFollowers 獲取這些轉發者(tweeter)的粉絲。它接收輸入流: [id, tweeter], 它輸出: [id, follower]。當然,當某人關注的多個人都轉發了同一條推文時,follower tuple會存在重復,這就需要下一步的去重。
3、PartialUniquer 通過粉絲的id來group粉絲,這使得相同的粉絲會被引導到同一個task。因此不同的task接收到的粉絲是不同的 — 從而起到去重的作用。它的輸出流:[id, count] 即輸出這個task上統計的粉絲個數。
4、最后,CountAggregator 接收到所有的局部數量, 把它們加起來就算出了我們要的 reach 值。

我們來看一下PartialUniquer的實現:
  1. public class PartialUniquer extends BaseBatchBolt {
  2.     BatchOutputCollector _collector;
  3.     Object _id;
  4.     Set<String> _followers = new HashSet<String>();

  5.     @Override
  6.     public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
  7.         _collector = collector;
  8.         _id = id;
  9.     }

  10.     @Override
  11.     public void execute(Tuple tuple) {
  12.         _followers.add(tuple.getString(1));
  13.     }

  14.     @Override
  15.     public void finishBatch() {
  16.         _collector.emit(new Values(_id, _followers.size()));
  17.     }

  18.     @Override
  19.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  20.         declarer.declare(new Fields("id", "partial-count"));
  21.     }
  22. }
復制代碼


當PartialUniquer在execute方法里面接收到一個粉絲tuple的時候, 它把這個tuple添加到當前request-id對應的Set里面去(利用Set元素不重復的特點進行去重)。

PartialUniquer繼承了BaseBatchBolt類。對於每個request-id,創建一個相應batch bolt的實例,並且Storm會在合適的時候清理這些實例。batch bolt提供了finishBatch的方法,該方法將在這個batch中的所有tuple被處理完之后調用。PartialUniquer僅發送一個tuple,包含當前這個request-id在這個task上的粉絲數量。

LinearDRPCTopologyBuilder的工作原理
1、DRPCSpout發射tuple: [args, return-info]。 return-info包含DRPC服務器的主機地址、端口以及當前請求的request-id(DRPC服務器生成)
2、DRPC Topology包含以下元素:
DRPCSpout
PrepareRequest(生成request-id, return info以及args)
CoordinatedBolt
JoinResult — 通過return info組合結果
ReturnResult — 連接到DRPC 服務器 並且返回結果
3、LinearDRPCTopologyBuilder是利用storm的原語來構建高層抽象的很好的例子。


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com