HQueue:基於HBase的消息隊列


​1. HQueue簡介

HQueue是一淘搜索網頁抓取離線系統團隊基於HBase開發的一套分布式、持久化消息隊列。它利用HTable存儲消息數據,借助HBase Coprocessor將原始的KeyValue數據封裝成消息數據格式進行存儲,並基於HBase Client API封裝了HQueue Client API用於消息存取。

HQueue可以有效使用在需要存儲時間序列數據、作為MapReduce Job和iStream等輸入、輸出供上下游共享數據等場合。

​2. HQueue特性

由於HQueue是基於HBase進行消息存取的,因此站在HDFS和HBase的肩膀上,使得其具備如下特點:

​(1)支持多Partitions,可根據需求設置Queue的規模,支持高並發訪問(HBase的多Region);

​(2)​支持自動Failover,任何機器Down掉,Partition可自動遷移至其他機器(HBase的Failover機制);

(3)​支持動態負載均衡,Partition可以動態被調度到最合理的機器上(HBase的LoadBalance機制,可動態調整);

​(4)利用HBase進行消息的持久化存儲,不丟失數據(HBase HLog和HDFS Append);

​(5)隊列的讀寫模式與HBase的存儲特性天然切合,具備良好的並發讀寫性能(最新消息存儲在MemStore中,寫消息直接寫入MemStore,通常場景下都是內存級操作);

​(6)支持消息按Topic進行分類存取(HBase中的Qualifier);

​(7)支持消息TTL,自動清理過期消息(HBase支持KeyValue級別的TTL);

​(8)HQueue = HTable Schema Design + HQueue Coprocessor + HBase Client Wrapper,完全擴展開發,無任何Hack工作,可隨HBase自動升級;

(9)​HQueue Client API基於HBase Client Wrapper進行簡單封裝,HBase的ThriftServer使得其支持多語言API,因此HQueue也很容易封裝出多語言API;
(10)HQueue Client API可以天然支持Hadoop MapReduce Job和iStream的InputFormat機制,利用Locality特性將計算調度到存儲最近的機器;

​(11)HQueue支持消息訂閱機制(HQueue 0.3及后續版本)。

3. HQueue系統設計及處理流程

3.1. HQueue系統結構

​HQueue系統結構如圖(1)所示:

HQueue系統結構圖(1):HQueue系統結構

其中:​

​(1)每個Queue對應一個HTable,創建Queue可以通過Presharding Table方式創建,有利於負載均衡。

​(2)每個Queue可以有多個Partitions(HBase Regions),這些Partitions均勻分布在HBase集群中的多個Region Servers中。

​(3)每個Partition可以在HBase集群的多個Region Servers中動態遷移。任何一台Region Server掛掉,運行在其上的HQueue Partition可以自動遷移到其他Region Server上,並且數據不會丟失。當集群負載不均衡時,HQueue Partition會自動被HMaster遷移到負載低的Region Server。

​(4)每個Message對應一個HBase KeyValue Pair,按MessageID即時間順序存儲在HBase Region中。MessageID由Timestamp和同一Timestamp下自增的SequenceID構成,詳細信息參見《Message存儲結構》部分。

​3.2. Message存儲結構

​Message存儲結構如圖(2)所示:

Message存儲結構

圖(2):Message存儲結構

其中:​

​(1)RowKey:由PartitionID和MessageID構成。

  • ​PartitionID:​一個Queue可以有多個Partitions,目前最多支持Short.MAX_VALUE個Partitions。Partition ID可以不在創建Message對象時指定,而是在發送消息時設定,或者不指定而使用一個隨機Partition ID。
  • ​MessageID:即消息ID,它由Timestamp和SequenceID兩部分組成。Timestamp是消息寫入HQueue時的時間戳,單位為毫秒。SequenceID是同一Timestamp下消息的順序編號,目前最多支持同一Timestamp下Short.MAX_VALUE個Messages。

​(2)Column:由Column Family和Message Topic構成。

  • Column Family:HBase Column Family,此處為固定值“message”。
  • Message Topic :HBase Column Qualifier,消息Topic名稱。用戶可以根據需要將Message存儲在不同的Topics之下,也可以從Queue中獲取感興趣的Topics消息數據。

​(3)Value:即消息內容。

​3.3. HQueue消息寫入及Coprocessor處理流程

​HQueue利用HQueue Client API寫入消息數據,為保證消息唯一和有序,HQueue利用Coprocessor處理用戶寫入消息的MessageID,然后立即放入HBase MemStore中,使其可以被訪問到,最后持久化的HLog中。具體的處理邏輯如圖(3)所示:

數據寫入及Coprocessor處理流程

​圖(3):數據寫入及Coprocessor處理流程

​其中:

​(1)HQueue封裝了HQueue Client API,用戶可以使用其提供Put等方法向HQueue中寫入消息。

(2)HQueue Client會使用Message.makeKeyValueRow()用於完成將Message數據結構轉換成HBase Rowkey。HQueue所要求的RowKey格式可以參加上述內容。
(3)HQueue Client在完成RowKey的轉換后,會調用HTable的put方法按照HBase標准的寫入流程來完成消息的寫入。
(4)HQueue上注冊有HQueueCoprocessor,它擴展自BaseRegionObserver。HRegion在真正寫入消息數據前,會調用HQueueCoprocessor的preBatchMutate方法,該方法主要用於調整MessageID,保證MessageID唯一並且有序。
(5)在HQueueCoprocessor的preBatchMutate方法中同時會調整Durability為SKIP_WAL,這樣HBase將不會主動將消息數據持久化進HLog。
(6)HRegion在寫入消息數據后,會調用HQueueCoprocessor的postBatchMutate方法,該方法主要完成將消息數據持久化進HLog的功能。

​3.4. HQueue Scan處理流程

​為了方便從Queue中Scan數據,HQueue封裝了ClientScanner,提供了QueueScanner、PartitionScanner和CombinedPartitionScanner等Scanner,用於不同的場景。HQueue Scan的具體處理流程如圖(4)所示:

HQueue Scan處理流程

圖(4):HQueue Scan處理流程

其中:

(1)用戶可以根據需要從HQueue Client中獲取所需的Queue Scanner,目前主要提供三種Scanner:

  • QueueScanner:用於Scan Queue中全部Partitions的數據;
  • ​PartitionScanner:用於Scan Queue中指定Partition的數據;
  • ​CombinedPartitionScanner:用於Scan Queue中若干指定Partitions的數據。

(2)用戶獲取到Scanner之后,可以循環調用Scanner的next方法依次取出消息數據,直至無數據返回,本次Scan結束。Scan結束后,用戶應主動關閉Scanner以便及時釋放資源。
(3)用戶在不再使用先前創建的Queue對象時,應主動關閉Queue以便及時釋放資源。

​3.5. HQueue訂閱流程

3.5.1. 整體流程

HQueue自0.3版本開始提供訂閱功能,一個訂閱者可以訂閱一個Queue的多個Partitions、多個Topics。與用戶使用Scanner主動Scan消息數據的方式相比,訂閱方式具有(1)消息數據一旦寫入Queue便會被主動推送至訂閱者,消息送達更為及時;(2)訂閱者被動接收新消息,可以省去HQueue無新消息數據時多余的Scan操作,減少系統開銷等優點。

HQueue訂閱流程處理邏輯如圖(5)所示:

HQueue訂閱流程處理邏輯

​圖(5):HQueue訂閱流程處理邏輯

其中:

(1)HQueue訂閱主要由Subscriber、ZooKeeper和Coprocessor這三部分組成。其中:

  • ​Subscrier:即訂閱者。主要完成向ZoeoKeeper寫入訂閱信息、啟動監聽、接收新消息並回調注冊在其上的消息處理函數(MessageListener)等功能。
  • ​ZooKeeper:用於保存訂閱者提交的訂閱信息,主要包括訂閱者訂閱的Queue、Partitions和Topics;訂閱者的地址和Checkpoint等信息,更為詳細信息參見后續描述。
  • ​Coprocessor:主要完成從ZooKeeper獲取訂閱信息、使用InternalScanner從Queue中Scan最新的消息、將新消息發送至訂閱者並將當前Checkpoint更新至ZooKeeper等功能。

(2)Coprocessor的主要處理流程如下:
Step 1:創建Subscriber,添加訂閱信息和消息處理函數,將訂閱信息寫入ZooKeeper,啟動監聽等待接收新消息。寫入ZooKeeper中的訂閱信息主要包括:

  • ​訂閱者訂閱的Queue名稱;
  • ​訂閱者訂閱的Queuee Partitions以及各Partition上消息的起始ID。一個訂閱者可以訂閱多個Partitions,如果沒有指定,那么認為訂閱該Queue的所有Partitions。
  • ​訂閱者訂閱的消息Topics。一個訂閱者可以訂閱多個主題,如果沒有指定,那么認為訂閱該Queue上的所有Topics。
  • ​訂閱者的Addresss/Hostname和監聽端口。用戶創建訂閱者時可以指定監聽端口,如果沒有指定,那么會隨機選擇一個當前可用端口作為監聽端口。

Step 2:Coprocessor從ZooKeeper獲取訂閱信息並向ZooKeeper注冊相關Watcher,以便ZooKeeper中訂閱信息發生變化時ZooKeeper能夠及時通知Coprocessor。Coprocessor在獲取到訂閱信息后,會根據需要創建SubscriptionWorker等工作線程,以便從HQueue Partition中Scan消息並將消息發送至Subscriber。
Step 3:Coprocessor從HQueue Partition中Scan新消息。
Step 4:Coprocessor將新消息發送至Subscriber。
Step 5:Subscriber在接收到新消息時,會回調注冊在其上的回調函數。
Step 6:待新消息發送成功后,Coprocessor會將消息的Checkpoint更新至ZooKeeper以便后續使用。
Step 7:Subscriber取消訂閱,並從ZooKeeper中刪除必要的訂閱信息。
Step 8:ZooKeeper會通過注冊在其上的Watcher將Subscriber訂閱信息的變化通知至Coprocessor,Coprocessor根據訂閱信息的變化,暫停SubscriptionWorker等工作線程等。

3.5.2. HQueue Subscriber

​HQueue Subscriber結構和主要處理邏輯如圖(6)所示:

HQueue Subscriber結構和主要處理邏輯

​圖(6):HQueue Subscriber結構和主要處理邏輯

其中:

​(1)Subscriber主要由兩部分組成:SubscriberZooKeeper和Thrift Server。其中,SubscriberZooKeeper主要完成與ZooKeeper相關的若干操作,包括寫入訂閱信息、刪除訂閱信息等。Coprocessor與Subscriber之間的通訊通過Thrift來完成,Subscriber中啟動Thrift Server,監聽指定的端口,等待接收Coprocessor發送過來的新消息。

(2)Subscriber通過Thrift Server接收到新消息后,會回調注冊在其上的回調函數(MessageListeners),並將狀態碼返回給Coprocessor。
(3)可以在一個Subscriber上注冊多個MessageListeners,多個MessageListeners會被依次調用。

​3.5.3. HQueue Coprocessor

​HQueue Coprocessor結構和主要處理邏輯如圖(7)所示:

HQueue Coprocessor結構和主要處理邏輯

​圖(7):HQueue Coprocessor結構和主要處理邏輯

其中:

​(1)Coprocessor:主要由兩部分構成SubscriptionZooKeeper和SubscriptionWorker。

  • SubscriptionZooKeeper:主要完成與ZooKeeper相關的工作,包括從ZooKeeper獲取訂閱信息並注冊相關Watcher、SubscriptionWorker將Checkpoint更新至ZooKeeper等操作。
  • SubscriptionWorker又主要包括MessageScanner和MessageSender兩部分,主要完成Scan新消息、發送消息至Subscriber和更新Checkpoint等操作。

(2)MessageScanner主要完成創建InternalScanner,從Queue Partition中Scan新消息,並將其放入緩沖隊列中等操作。

  • ​當緩沖隊列中沒有空閑空間時,MessageScanner會等待直至緩沖隊列中的消息被MessageSender消費掉,騰出剩余空間。
  • ​當Queue Partition中沒有新消息時,MessageScanner會主動Sleep,當有新消息寫入時,Coprocessor會通過SubscriptionWorker喚醒MessageScanner,開始新一輪Scan。

(3)MessageSender主要完成從緩沖隊列中取出新消息,將其發送至Subscriber,並等待Subscriber發回響應等操作。當緩沖隊列中沒有新消息時,MessageSender會等待直至有新消息到來。
(4)MessageSender中的CheckpointUpdater會定時將當前的Checkpoint寫入ZooKeeper中的相關訂閱節點中,以便后續使用。

​3.5.4. 訂閱信息層次結構

HQueue相關訂閱信息保存在ZooKeeper,ZooKeeper中訂閱信息的層次結構如圖(8)所示:

訂閱信息層次結構

圖(8):訂閱信息層次結構

其中:

(1)訂閱者節點(subscriber_x)上會記錄該訂閱者在Queue Partition上的Checkpoint。該Checkpoint由Subscriber在發起訂閱時寫入,並由SubscriptionWorker MessageSender中的CheckpointUpdater來更新。
(2)訂閱者節點下會有兩個臨時性節點:address和topics,分別保存訂閱者的IP Address/Hostname:Port和訂閱的主題。當訂閱者主動取消訂閱時會刪除這兩個臨時節點,當訂閱者意外退出時,等Session失效后,ZooKeeper會刪除該臨時節點。

​3.5.5. 訂閱者Thrift Service

HQueue訂閱功能使用Thrift來簡化對多語言客戶端的支持。Subscriber啟動Thrift Server,監聽指定端口,接收消息,並回調MessageListeners以便處理消息。用於描述HQueue Subscriber所提供服務的接口定義如下所示:

namespace java com.etao.hadoop.hbase.queue.thrift.generated
/**
* HQueue MessageID
*/
struct TMessageID {
1: i64 timestamp,
2: i16 sequenceID
}
/**
* HQueue Message
*/
struct TMessage {
1: optional TMessageID id,
2: optional i16 partitionID,
3: binary topic,
4: binary value
}
/**
* HQueue Subscriber Service
*/
service HQueueSubscriberService {
i32 consumeMessages(1:list<TMessage> messages)
}

4. HQueue使用

4.1. HQueue Toolkit

為方便用戶使用,HQueue封裝了HQueue Client API用於存取消息數據。自HQueue 0.3版本,HQueue日志運維工具集成進HQueue Shell中,構成HQueue Toolkit,為用戶提供一站式服務,方便用戶管理Queue以及Queue訂閱者。

同HBase Shell使用方式相似,用戶使用$ ${HBASE_HOME}/bin/hqueue shell便可以進入HQueue Shell命令行工具。需要注意的是,用戶在使用HQueue Toolkit之前需要確保已經部署HQueue Toolkit。

​ HQueue Toolkit中包括創建Queue、Disable Queue、Enable Queue、刪除Queue和清空Queue等命令。​使用示例如下:

(1)創建隊列

USAGE:create ‘queue_name’, partition_count, ttl, [Configuration Dictionary]

DESCRIPTIONS:

  • queue_name:待創建的HQueue的名稱,必選參數。
  • partition_count:待創建的HQueue的Partition個數,必選參數。
  • ttl:失效時間,必選參數。
  • Configuration Dictonary:可選配置參數。目前支持的配置參數為:(1)hbase.hqueue.partitionsPerRegion;(2)hbase.hregion.memstore.flush.size;(3)hbase.hregion.majorcompaction;(4)hbase.hstore.compaction.min;(5)hbase.hstore.compaction.max;(6)hbase.hqueue.compression;(7)hbase.hstore.blockingStoreFiles等。
EXAMPLES:
  • hqueue> create ‘q1′, 32, 86400
  • hqueue> create ‘q1′, 32, 86400, {‘hbase.hqueue.partitionsPerRegion’ => ’4′, ‘hbase.hstore.compaction.min’ => ’16′, ‘hbase.hstore.compaction.max’ => ’32′}

(2)清空隊列

USAGE:truncate_queue 'queue_name'
DESCRIPTIONS:
  • queue_name:待清空的Queue名稱,必選參數。
EXAMPLES:
  • hqueue(main):013:0> truncate_queue 'replication_dev_2_test_queue'
需要注意的是:該命令與HBase Shell中的truncate有所不同,該命令僅會刪除Queue中的數據,而保留Queue的Presharding信息。
​    更多操作請參閱:http://searchwiki.taobao.ali.com/index.php/HQueue_Toolkit#Queue.E7.AE.A1.E7.90.86
(3)新增訂閱者
USAGE:add_subscriber 'queue_name', 'subscriber_name'
DESCRIPTIONS:
  • queue_name:隊列名稱,必選參數。
  • subscriber_name:訂閱者名稱,必選參數。
EXAMPLES:
  • add_subscriber 'replication_dev_2_test_queue', 'subscriber_1'

(4)刪除訂閱者

USAGE:delete_subscriber 'subscriber_name', 'queue_name'
DESCRIPTIONS:
  • queue_name:訂閱者所訂閱的Queue名稱,必選參數。
  • subscriber_name:訂閱者名稱,必選參數。
EXAMPLES:
  • hqueue(main):040:0> delete_subscriber 'replication_dev_2_test_queue', 'subscriber_1'

更多信息可以參閱:http://searchwiki.taobao.ali.com/index.php/HQueue_Toolkit#.E8.AE.A2.E9.98.85.E8.80.85.E7.AE.A1.E7.90.86

4.2. Put

​HQueue Client API中的Put相關操作可以完成將用戶消息數據寫入HQueue中,Put支持批量操作,具體使用方式示例如下:

HQueue queue = new HQueue(queueName);

String topic1 = "crawler";
String value1 = "http://www.360test.com";

// 寫入單條消息數據,不指定Partition ID。在不指定Partition ID的情況下,將會在Queue的所有Partitions中隨機選取一個。
Message message1 = new Message(Bytes.toBytes(topic1), Bytes.toBytes(value1));
queue.put(message);

// 寫入Message時,顯式指定PartitionID。
short partitionID = 10;
queue.put(partitionID, message1);

List<Message> messages = new ArrayList<Message>();
messages.add(message1);

String topic2 = "dump";
String value2 = "http://www.jd.com";
Message message2 = new Message(Bytes.toBytes(topic2), Bytes.toBytes(value2));
messages.add(message2);

// 寫入多條消息數據,不指定Partition ID。
queue.put(messages);

// 寫入多條消息數據,指定Partition ID。
queue.put(partitionID, messages);

queue.close();

4.3. Scan

​為方便用戶從Queue中Scan消息數據,HQueue Client API提供了三種自定義Scanner,分別為:QueueScanner、PartitionScanner和CombinedPartitionScanner,使用示例如下:

String queueName = "subscription_queue";
Queue queue = new HQueue(queueName);

// 起始時間戳
long currentTimestamp = System.currentTimeMillis();
MessageID startMessageID = new MessageID(currentTimestamp - 6000);
MessageID stopMessageID = new MessageID(currentTimestamp);

Scan scan = new Scan(startMessageID, stopMessageID);
// 添加主題
scan.addTopic(Bytes.toBytes("topic1"));
scan.addTopic(Bytes.toBytes("topic2"));

Message message = null;

// 使用QueueScanner,掃描Queue下全部Partitions中的數據
QueueScanner queueScanner = queue.getQueueScanner(scan);
while ((message = queueScanner.next()) != null) {
// no-op
}
queueScanner.close();

short partitionID1 = 1;

// 使用PartitionScanner,掃描Queue中指定的Partition的數據
PartitionScanner partitionScanner = queue.getPartitionScanner(partitionID1, scan);
while ((message = partitionScanner.next()) != null) {
// no-op
}
​partitionScanner.close();

short partitionID2 = 2;
Map<Short, Scan> partitions = new HashMap<Short, Scan>();
// 添加多個Partitions
partitions.put(partitionID1, scan);
partitions.put(partitionID2, scan);

CombinedPartitionScanner combinedScanner = queue.getCombinedPartitionScanner(partitions);
while ((message = combinedScanner.next()) != null) {
// no-op
}
​combinedScanner.close();

​queue.close();

​4.4. 訂閱消息

​HQueue自0.3版本開始提供訂閱功能,使用方式示例如下:

HQueue queue = null;
HQueueSubscriber subscriber = null;

try {
String queueName = "subscription_queue";
queue = new HQueue(queueName);

Set<Pair<Short, MessageID>> partitions = new HashSet<Pair<Short, MessageID>>();

// 添加所訂閱的Partitions
Pair<Short, MessageID> partition1 = new Pair<Short, MessageID>((short)0, null);
partitions.add(partition1);
Pair<Short, MessageID> partition2 = new Pair<Short, MessageID>((short)1, null);
partitions.add(partition2);
Pair<Short, MessageID> partition3 = new Pair<Short, MessageID>((short)2, null);
partitions.add(partition3);

// 添加所訂閱的Topics
Set<String> topics = new HashSet<String>();
topics.add("topic_1");
topics.add("topic_2");
topics.add("topic_3");

// 訂閱者名稱
String subscriberName = "subscriber_1";

Subscription subscription = new Subscription(subscriberName, topics);
subscription.addPartitions(partitions);

// 添加回調函數
List<MessageListener> listeners = new LinkedList<MessageListener>();
MessageListener blackHoleListener = new BlackHoleMessageListener(subscriberName);
listeners.add(blackHoleListener);

// 創建訂閱者
subscriber = queue.createSubscriber(subscription, listeners);

subscriber.start();

Thread.sleep(600000L);
​ subscriber.stop("Time out, request to stop subscriber:" + subscriberName);
​} catch (Exception ex) {
LOG.error("Received unexpected exception when testing subscription.", ex);
} finally {
if (queue != null) {
try {
queue.close();
queue=null;
} catch (IOException ex) {
// ignore the exception
}
}
}

4.5. ThriftServer API

​HBase自帶的ThriftServer實現了對HTable的多語言API支持,HQueue在HBase ThriftServer中擴展了對HQueue的支持,使得C++、Python和PHP等語言也可以方便地訪問HQueue。

​HQueue目前提供的Thrift API如下所示:

1 ScannerID messageScannerOpen(1:Text queueName,2:i16 partitionID,3:TMessageScan messageScan) 根據Scan,打開Queue中某個Partition上的Scanner
2 TMessage messageScannerGet(1:ScannerID id) 逐條獲取Message
3 list<TMessage> messageScannerGetList(1:ScannerID id,2:i32 nbMessages) 批量獲取Messages
4 void messageScannerClose(1:ScannerID id) 關閉ScannerID
5 void putMessage(1:Text queueName,2:TMessage tMessage) 向Queue中寫入Message,使用隨機的Partition ID
6 void putMessages(1:Text queueName,2:list<TMessage> tMessages) 向Queue中批量寫入Messages,使用隨機的Partition ID
7 void putMessageWithPid(1:Text queueName,2:i16 partitionID,3:TMessage tMessage) 向Queue中寫入Message,使用指定的Partition ID
8 void putMessagesWithPid(1:Text queueName,2:i16 partitionID,3:list<TMessage> tMessages) 向Queue中批量寫入Messages,使用指定的Partition ID
9 list<Text> getQueueLocations(1:Text queueName) 獲取Queue中所有Partition所在主機的地址

5. 總結

以上是對HQueue概念、特性、系統設計、處理流程以及應用等方面的簡單闡述,希望對大家有所幫助。


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com