hbase源碼系列(十四)Compact和Split


先上一張圖講一下Compaction和Split的關系,這樣會比較直觀一些。

Compaction把多個MemStore flush出來的StoreFile合並成一個文件,而Split則是把過大的文件Split成兩個。

之前在Delete的時候,我們知道它其實並沒有真正刪除數據的,那總不能一直不刪吧,下面我們就介紹一下它刪除數據的過程,它就是Compaction。

在講源碼之前,先說一下它的分類和作用。

Compaction主要起到如下幾個作用:

1)合並文件

2)清除刪除、過期、多余版本的數據

3)提高讀寫數據的效率

Minor & Major Compaction的區別

1)Minor操作只用來做部分文件的合並操作以及包括minVersion=0並且設置ttl的過期版本清理,不做任何刪除數據、多版本數據的清理工作。

2)Major操作是對Region下的HStore下的所有StoreFile執行合並操作,最終的結果是整理合並出一個文件。

先說一下怎么使用吧,下面分別是它們是shell命令,可以在hbase的shell里面執行。

//major compaction
major compact '表名或region名'

//minor compaction
compact '表名或region名'

下面我們開始看入口吧,入口在HBaseAdmin,找到compact方法,都知道我們compact可以對表操作或者對region進行操作。

1、先把表或者region相關的region信息和server信息全部獲取出來

2、循環遍歷這些region信息,依次請求compact操作

AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
CompactRegionRequest request
= RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
try {
admin.compactRegion(
null, request);
}
catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}

到這里,客戶端的工作就結束了,我們直接到HRegionServer找compactRegion這個方法吧。

    //major compaction多走這一步驟
if (major) {
if (family != null) {
store.triggerMajorCompaction();
}
else {
region.triggerMajorCompaction();
}
}
    
//請求compaction走這里
if(family != null) {
compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER,
null);
}
else {
compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER,
null);
}

我們先看major compaction吧,直接去看triggerMajorCompaction和requestCompaction方法。

Compaction

進入方法里面就發現了它把forceMajor置為true就完了,看來這個參數是major和minor的開關,接着看requestCompaction。

CompactionContext compaction = null;
if (selectNow) {
compaction
= selectCompaction(r, s, priority, request);
if (compaction == null) return null; // message logged inside
}
// 要根據文件的size來判斷用給個大的線程池還是小的線程池
long size = selectNow ? compaction.getRequest().getSize() : 0;
ThreadPoolExecutor pool
= (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(
new CompactionRunner(s, r, compaction, pool));

上面的步驟是執行selectCompaction創建一個CompactionContext,然后提交CompactionRunner。

我們接着看CompactionContext的創建過程吧,這里還需要分是用戶創建的Compaction和系統創建的Compaction。

1、創建CompactionContext

2、判斷是否是非高峰時間,下面是這兩個參數的值

int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
int endHour = conf.getInt("hbase.offpeak.end.hour", -1);

3、選擇需要進行compaction的文件,添加到CompactionRequest和filesCompacting列表當中

compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty());

我們看看這個select的具體實現吧。

public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
request
= compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
return request != null;
}

這里的select方法,從名字上看是壓縮策略的意思,它是由這個參數控制的hbase.hstore.defaultengine.compactionpolicy.class,默認是ExploringCompactionPolicy這個類。

接着看ExploringCompactionPolicy的selectCompaction方法,發現這個方法是繼承來的,找它的父類RatioBasedCompactionPolicy。

public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
ArrayList
<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
//從candidateSelection排除掉filesCompacting中的文件
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (!forceMajor) {
// 如果不是強制major的話,包含了過期的文件,先刪除過期的文件
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
ArrayList
<StoreFile> expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis()
- cfTtl);
if (expiredSelection != null) {
return new CompactionRequest(expiredSelection);
}
}
//居然還要跳過大文件,看來不是major的還是不行的,凈挑小的弄
candidateSelection = skipLargeFiles(candidateSelection);
}
// 是不是major的compaction還需要判斷,做這個操作還是比較謹慎的
boolean majorCompaction = (
(forceMajor
&& isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection)
);

if (!majorCompaction) {
  
//過濾掉bulk load進來的文件
candidateSelection = filterBulk(candidateSelection);
//過濾掉一些不滿足大小的文件
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
//檢查文件數是否滿足最小的要求,文件不夠,也不做compaction
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
//非major的超過最大可以compact的文件數量也要剔除掉,major的只是警告一下
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
CompactionRequest result
= new CompactionRequest(candidateSelection);
result.setOffPeak(
!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
return result;
}
View Code

從上面可以看出來,major compaction的選擇文件幾乎沒什么限制,只要排除掉正在compacting的文件就行了,反而是minor compact有諸多的排除選項,因為默認的compaction是定時執行的,所以它這方面的考慮吧,排除太大的文件,選擇那些過期的文件,排除掉bulkload的文件等等內容。

Minor Compaction的文件選擇策略

我們再簡單看看applyCompactionPolicy這個方法吧,它是minor的時候用的,它的過程就像下圖一樣。

這個是雙層循環: 

從0開始,循環N遍(N=文件數),就相當於窗口向右滑動,指針為start

----->從currentEnd=start + MinFiles(默認是3)-1,每次增加一個文件作為考慮,類似擴張的動作, 窗口擴大, 指針為

-------------->從candidateSelection文件里面取出(start, currentEnd + 1)開始

-------------->小於最小compact數量文件,默認是3,continue

-------------->大於最大compact數量文件,默認是10,continue

-------------->獲取這部分文件的大小

-------------->如果這部分文件數量比上次選擇方案的文件還小,替換為最小文件方案

-------------->大於MemStore flush的大小128M並且符合有一個文件不滿這個公式(FileSize(i) <= ( 文件總大小- FileSize(i) ) * Ratio),continue

       (注意上面的Ratio是干嘛的,這個和前面提到的非高峰時間的數值有關系,非高峰時段這個數值是5,高峰時間段這個值是1.2, 這說明高峰時段不允許compact過大的文件)

-------------->開始判斷是不是最優的選擇(下面講的mayBeStuck是從selectCompaction傳入的,可選擇的文件超過7個的情況,上面黃色那部分代碼)

          1)如果mayBeStuck並且不是初次,如果 文件平均大小 > 上次選擇的文件的平均大小*1.05, 替換上次的選擇文件方案成為最優解

          2)初次或者不是mayBeStuck的情況,文件更多的或者文件相同、總文件大小更小的會成為最新的選擇文件方案

如果經過比較之后的最優文件選擇方案不為空,就把它返回,否則就把最小文件方案返回。

下面是之前的Ratio的參數值,需要配合之前提到的參數配合使用的。

hbase.hstore.compaction.ratio              高峰時段,默認值是1.2
hbase.hstore.compaction.ratio.offpeak 非高峰時段,默認值是5

 

到這里先來個小結吧,從上面可以看得出來,這個Minor Compaction的文件選擇策略就是選小的來,選最多的小文件來合並。

選擇文件結束,回到compact的主流程

4、把CompactionRequest放入CompactionRunner,走線程池提交

之前的代碼我再貼一下,省得大家有點凌亂。

ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(
new CompactionRunner(s, r, compaction, pool));

我們去看CompactionRunner的run方法吧,它也在當前的類里面。

      if (this.compaction == null) {this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); 
     // 出口,實在選不出東西來了,它會走這里跑掉
     if (this.compaction == null) return;
     // ....還有別的限制,和父親運行的線程池也要一致,尼瑪,什么邏輯

   
}
    boolean completed = region.compact(compaction, store);if (completed) { // blocked的regions再來一次,這次又要一次compaction意欲何為啊
// 其實它的出口在上面的那段代碼,它執行之后,沒有這里這么惡心
if (store.getCompactPriority() <= 0) { requestSystemCompaction(region, store, "Recursive enqueue");
}
else {
// compaction之后的region可能很大,超過split的數量就要split了
requestSplit(region); }

先是對region進行compact,如果完成了,判斷一下優先級,優先級小於等於0,請求系統級別的compaction,否則請求split。

我們還是先看HRegion的compact方法,compact開始前,它要先上讀鎖,不讓讀了,然后調用HStore中的compact方法。

     // 執行compact,生成新文件
List<Path> newFiles = compaction.compact();
//把compact生成的文件移動到正確的位置
sfs = moveCompatedFilesIntoPlace(cr, newFiles);
//記錄WALEdit日志
writeCompactionWalRecord(filesToCompact, sfs);
//更新HStore相關的數據結構
replaceStoreFiles(filesToCompact, sfs);/
/
歸檔舊的文件,關閉reader,重新計算file的大小 completeCompaction(filesToCompact);

comact生成新文件的方法很簡單,給源文件創建一個StoreScanner,之前說過StoreScanner能從多個Scanner當中每次都取出最小的kv,然后用StoreFile.Append的方法不停地追加寫入即可,這些過程在前面的章節都介紹過了,這里不再重復。

簡單的說,就是把這些文件合並到一個文件去了,尼瑪,怪不得io那么大。

剩下的就是清理工作了,這里面有意思的就是它會記錄一筆日志到writeCompactionWalRecord當中,在之間日志恢復那一章的時候,貼出來的代碼里面有,只是沒有詳細的講。因為走到這里它已經完成了compaction的過程,只是沒有把舊的文件移入歸檔文件當中,它掛掉重啟的時候進行恢復干的事情,就是替換文件。

5、store.getCompactPriority() 下一步是天堂抑或是地獄?

compact完了,要判斷一下這個,真是天才啊

public int getStoreCompactionPriority() {
int blockingFileCount = conf.getInt(
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
int priority = blockingFileCount - storefiles.size();
return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}

比較方法是這個,blockingFileCount的默認值是7,如果compact之后storefiles的文件數量大於7的話,就很有可能再觸發一下,那么major compaction觸發的可能性低,minor觸發的可能性非常大。

不過沒關系,實在選不出文件來,它會退出的。咱們可以將它這個參數hbase.hstore.blockingStoreFiles設置得大一些,弄出來一個比較大的數字。

Split

好,我們接着看requestSplit。

if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
byte[] midKey = r.checkSplit();
if (midKey != null) {
requestSplit(r, midKey);
return true;
}
}

先檢查一下是否可以進行split,如果可以,把中間的key返回來。

那條件是啥?在這里,if的條件是成立的,條件判斷在IncreasingToUpperBoundRegionSplitPolicy的shouldSplit方法當中。

遍歷region里面所有的store

1、Store當中不能有Reference文件。

2、store.size > Math.min(getDesiredMaxFileSize(), this.flushSize * (tableRegionsCount * (long)tableRegionsCount)) 就返回ture,可以split。

getDesiredMaxFileSize()默認是10G,由這個參數來確定hbase.hregion.max.filesize, 當沒超過10G的時候它就會根據128MB * (該表在這個RS上的region數量)平方。

midKey怎么找呢?找出最大的HStore,然后通過它來找這個分裂點,最大的文件的中間點。

return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);

但是如果是另外一種情況,我們通過客戶端來分裂Region,我們強制指定的分裂點,這種情況是按照我們設置的分裂點來進行分裂。

分裂點有了,我們接着看,我們發現它又提交了一個SplitRequest線程,看run方法。

1、先獲得一個tableLock,給這個表上鎖

2、執行SplitTransaction的prepare方法,然后execute

3、結束了釋放tableLock

      // 先做准備工作,然后再execute執行主流程,過程當中出錯了,就rollback
if (!st.prepare()) return; try {
st.execute(
this.server, this.server);
}
catch (Exception e) {
try {
if (st.rollback(this.server, this.server)) {
}
catch (RuntimeException ee) {this.server.abort(msg);
}
return;
}

prepare方法當中,主要做了這么件事,new了兩個新的region出來

this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);

我們接着看execute方法,這個是重頭戲。

PairOfSameType<HRegion> regions = createDaughters(server, services);
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());

總共分三步:

1、創建子region

2、上線子region

3、更改zk當中的狀態

我們先看createDaughters

    //在region-in-transition節點下給父region創建一個splitting的節點
createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b); this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);//在parent的region目錄下創建.splits目錄
this.parent.getRegionFileSystem().createSplitsDir();
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);

Map
<byte[], List<StoreFile>> hstoreFilesToSplit = null;
//關閉parent,然后返回相應的列族和storefile的map hstoreFilesToSplit = this.parent.close(false);
//從在線列表里下線parent services.removeFromOnlineRegions(this.parent, null);
this.journal.add(JournalEntry.OFFLINED_PARENT);
// 把parent的storefile均分給兩個daughter,所謂均分,只是創建引用文件而已
splitStoreFiles(hstoreFilesToSplit);

// 把臨時的Region A目錄重名為正式的region A 的目錄
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION); HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);

// 把臨時的Region B目錄重名為正式的region B的目錄
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION); HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
this.journal.add(JournalEntry.PONR);

// 修改meta表中的信息,設置parent的狀態為下線、並且split過,在增加兩列左右孩子,左右孩子的信息也通過put插入到meta中
MetaEditor.splitRegion(server.getCatalogTracker(), parent.getRegionInfo(),
a.getRegionInfo(), b.getRegionInfo(), server.getServerName());
return new PairOfSameType<HRegion>(a, b);

在splitStoreFiles這塊的,它給每個文件都開一個線程去進行split。

fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
fs.splitStoreFile(
this.hri_b, familyName, sf, this.splitrow, true);

這里其實是給每個文件都創建了Reference文件,無論它的文件當中包不包括splitRow。

    //parentRegion/.splits/region/familyName目錄
Path splitDir = new Path(getSplitsDir(hri), familyName);
// 其實它並沒有真正的split,而是通過創建Reference Reference r = top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
String parentRegionName = regionInfo.getEncodedName();
// 原來通過這么關聯啊,storefile名字 + 父parent的name Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
return r.write(fs, p);

把引用文件生成在每個子region對應的目錄,以便下一步直接重命令目錄即可。

重命名目錄之后,就是修改Meta表了,splitRegion的方法是通過Put來進行操作的,它修改parent的regioninfo這一列更新為最新的信息,另外又增加了splitA和splitB兩列,hri_a和hri_b則通過另外兩個Put插入到Meta表當中。

這個過程當中如果出現任何問題,就需要根據journal記錄的過程信息進行回滾操作。

怎么open這兩個子region就不講了,之前講《HMaster啟動過程》的時候講過了。

到這里split的過程就基本結束了,鑒於Compaction和Split的對io方面的巨大影響,所以在任何資料里面都是推薦屏蔽自動執行,寫腳本在晚上自動進行這些操作。

 

 

 


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com