Storm集群組件和編程模型




Storm工作原理:

     Storm是一個開源的分布式實時計算系統,常被稱為流式計算框架。什么是流式計算呢?通俗來講,流式計算顧名思義:數據流源源不斷的來,一邊來,一邊計算結果,再進入下一個流。例如一般金融系統一直不斷的運行,金融交易、用戶所有行為都記錄進日志里,日志分析出網站運維、獵戶信息;海量數據使得單節點處理不過來,所以就用到分布式計算機型,storm 是其中的典型代表之一,一般應用場景是:中間使用一個消息隊列系統如kafka,先將消息緩存起來,storm 中有很多的節點,分布式並行運行處理程序,進行數據處理。

    只要不是人為干預,storm 就一直實時不斷地進行數據處理。值得注意的是:並不是storm去處理,而是它可以將我們程序的很多jar包,業務程序,同時放到不同的服務器中並發的運行, 最終得到的結果就是不同系統的海量數據就會分散到不同的服務器中並發的進行處理,負載能力很強。 所以真正進行數據處理的是我們寫好的數據處理程序,storm的強大作用之一就是它為這些程序提供了運行溫床,將應用程序上傳到storm 集群中,在多台機器上並發運行,這樣就可以擴展程序的負載處理能力實現流式計算。


Storm 集群組件:

集群角色:

Nimbus:集群主節點,主要負責任務分配、響應客戶端提交topology請求以及任務失敗的調度

Supervisor:集群從節點,主要負責啟動、停止業務邏輯組件程序進程

    主從節點之間通過zookeeper集群進行連接,主從節點之間是fail-fastjava的一種錯誤機制)、無狀態的,主從節點的狀態信息均保存到zookeeper中或者本地硬盤里。這樣的好處就在於,哪怕是主節點kill掉了,storm會自動起一個備份主節點,因為無狀態的關系,所以任意一個節點都可以充當Nimbus一角。這種設計使得storm十分穩定。【譯自apache storm官網】

Storm 編程模型

Topology 

        業務處理模型

Spout 

       數據源組件,用於獲取數據,可通過文件或者消息隊列【kafkaactiveMQ】中獲取數據

Bolt 

       邏輯處理組件

 

     簡單理解,topology【拓撲結構】就是包含了數據源、邏輯處理組件的一個外在集合框架,使用storm可以定義一個topologyset多少個數據源組件,多少個邏輯處理組件。下面通過demo來具體解釋Storm編程模型的幾個主要元組

     例如現在需要對一組數據進行處理,將數據中所有的英文轉成大寫,再加上標識后綴,最后保存到本地文本中,當然這只是一個特別簡單的數據處理邏輯,僅用於幫助大家理解Storm編程模型。 那根據Storm的編程模型,實現這個數據處理需求需要建立1個數據源Spout組件,2個業務邏輯組件Bolt,以及一個Topology結構,將這3個組件加入到這個topology結構中。

public class RandomSpout extends BaseRichSpout{
SpoutOutputCollector collector=null;
String[] goods={"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"};
/*
* 獲取消息並發送給下一個組件的方法,會被storm 不斷地調用
* 從goods 數組中隨機獲取一個商品名封裝到tuple中去
*/
@Override
public void nextTuple() {
Random random=new Random();
String good=goods[random.nextInt(goods.length)];

//封裝到tuple中發送給下一個組件
collector.emit(new Values(good));
}

//進行初始化,只在開始時調用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
/*
* 定義tunple的schema
*
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("src_word"));
}
}

     數據源Spout組件通過繼承Storm基類,重寫三個最核心的方法,分別是open、nextTuple、和delcare方法;open是在將執行數據傳遞之前所執行的方法,用於初始化數據;nextTuple中核心方法就是collector的emit方法,用於將數據傳遞給下一個元組。delcare用於成名元組傳遞、接收數據的格式,可以簡單的理解為給傳遞的數據加上一個標識鍵。

public class UpperBolt extends BaseBasicBolt {

//每來一個消息元組tuple,都會被執行一次該方法
@Override
public void execute(Tuple tuple,BasicOutputCollector collector) {
//從tuple 中拿到數據--原始商品名
String src_word=tuple.getString(0);//獲取下標第一個消息
String upper=src_word.toUpperCase();
//發送出去
collector.emit(new Values(upper));
}
//給消息申明一個字段名
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("upper"));
}
}

     這個邏輯處理bolt 用於將spout數據源組件中傳遞的元組轉成大寫格式,先獲取tuple的數據,然后emit發送給下一個元組。

/*
* 給商品名稱添加后綴,然后寫入文件中
*/
public class SuffixBolt extends BaseBasicBolt{
FileWriter file =null;
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
file = new FileWriter("D://eclipse_plugin"+UUID.randomUUID());
} catch (IOException e) {
e.printStackTrace();
}
}
//每一次執行都去new 一個writer ,應該在調用excute 之前先把writer 初始化好==持續運行
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//從消息元組中拿到上一個組件發送過來的數據
String upper=tuple.getString(0);
String result=upper +"_suffix";
try {
file.append(result);
file.append("/n");
} catch (IOException e) {

e.printStackTrace();
}
}
//聲明該組件要發送出去的tuple的字段定義
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
}
}

     bolt和spout一樣,繼承storm基類之后,也會有prepare方法用於准備數據,初始化一些對象;excute方法則是每每傳遞過來一個元組,便會觸發執行一次,這個bolt的作用在於將上一個元組傳遞過來的數據加上后綴處理,然后寫入本地文件中。

     那么,寫好了這些基礎的數據源和業務邏輯處理元組,如何組織他們的數據傳遞關系,這就是Topology類的職責。

/*
* 描述topology的結構,以及創建topology並提交給集群
*/
public class TopoMain {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder=new TopologyBuilder();

//設置消息源組件 4表示spout進程個數
builder.setSpout("randomSpout", new RandomSpout(),4);

//設置邏輯處理組件
//shuffleGrouping 指定接收哪個組件傳過來的消息
builder.setBolt("upper", new UpperBolt(),4).shuffleGrouping("randomSpout");
builder.setBolt("result", new SuffixBolt(),4).shuffleGrouping("upper");

//創建一個topology
StormTopology topology=builder.createTopology();

Config config=new Config();
config.setNumWorkers(4);//設置進程個數
config.setDebug(true);//設置調試狀態
config.setNumAckers(0);//消息應答器,事務性不是很強,可設置為0

//提交topology到storm 定義一個名稱,好在集群里去標識;通過配置對象傳遞參數給集群,集群根據這些參數,任務調度進行調整
StormSubmitter.submitTopology("demotopo", config, topology);
}
}

     Topology類便將之前編寫的1個spout 和2個bolt組裝到一個topology中,並通過追加shuffleGrouping方法設置了他們之間的數據傳遞方向,以及進程個數。

     通過這個實例應該對storm的編程模型和編碼流程有了簡單的認識。但這只是storm的大山一小角,例如zookeeper對storm集群主從節點的管理、storm與消息中間件的結合處理海量數據,復雜的數據處理流程,這些才是storm真正大展身手的地方。



注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com