詳解MapReduce的模式、算法和用例


MapReduce常用組件介紹

HadoopMapReduce jobs可以切分成一系列運行於分布式集群中的map和reduce任務,每個任務只運行全部數據的一個指定的子集,以此達到整個集群的負載平衡。Map任務通常為加載,解析,轉換,過濾數據,每個reduce處理map輸出的一個子集。Reduce任務會去map任務端copy中間數據來完成分組,聚合。

MapReduce 的輸入是hdfs上存儲的一系列文件集。每個map任務被分為以下階段:record reader,mapper,combiner,partitioner。Map任務的輸出叫中間數據,包括keys和values,發送到reduce端。Reduce任務分為以下階段:shuffle,sort,reduce,output format。運行map任務的節點會盡量選擇數據所在節點。Combiner 是一個map階段分組數據,可選的,局部reducer。它根據用戶提供的方法在一個mapper范圍內根據中間鍵去聚合值。Partitioner會獲取從mapper(或combiner)來的鍵值對,並分割成分片,每個reducer一個分片。默認用哈希值,典型使用md5sum。然后partitioner根據reduce的個數執行取余運算:key.hashCode() % (number of reducers)。這樣能隨機均勻的根據key分發數據到reduce,同時保證不同mapper的相同key分到同一個reduce。Partitioner也可以自定義,使用更高級的樣式,例如排序。然而,更改partitioner很少用。Partitioner的每個map的數據會寫到本地磁盤,並等待對應的reducer檢測,拿走數據。
       Reduce任務開始於shuffle和sort階段。這一階段獲取partitioner的輸出文件,並下載到reduce運行的本地機器。這些分片數據會根據key合並,排序成一個大的數據文件。排序的目的是讓相同的key相鄰,方便在reduce階段值得迭代處理。
這一階段不能自定義,由框架自動處理。需要做的只是key的選擇和可以自定義用於分組的比較器。


基本MapReduce模式

計數與求和

問題陳述: 有許多文檔,每個文檔都有一些字段組成。需要計算出每個字段在所有文檔中的出現次數或者這些字段的其他什么統計值。例如,給定一個log文件,其中的每條記錄都包含一個響應時間,需要計算出平均響應時間。

解決方案: Mapper每遇到指定詞就把頻數記1,Reducer遍歷這些詞的集合然后把他們的頻數累加求和。

class Mapper

    method Map(docid id, doc d)

        for all term t in doc d do

              Emit(term t, count 1)

class Reducer

    method Reduce(term t, counts [c1, c2,...])

         sum = 0

         for all count c in [c1,c2,...] do

               sum = sum + c

               Emit(term t, count sum)

這種方法的缺點顯而易見,Mapper提交了太多無意義的計數。它完全可以通過先對每個文檔中的詞進行計數從而減少傳遞給Reducer的數據量:

class Mapper

   method Map(docid id, doc d)

      H = new AssociativeArray

      for all term t in doc d do

           H{t} = H{t} + 1

      for all term t in H do

         Emit(term t, count H{t})

如果要累計計數的不只是單個文檔中的內容,還包括了一個Mapper節點處理的所有文檔,那就要用到Combiner了:

class Mapper

   method Map(docid id, doc d)

      for all term t in doc d do

         Emit(term t, count 1)

class Combiner

   method Combine(term t, [c1, c2,...])

      sum = 0

       for all count c in [c1, c2,...] do

           sum = sum + c

      Emit(term t, count sum)

 class Reducer

   method Reduce(term t, counts [c1, c2,...])

      sum = 0

      for all count c in [c1, c2,...] do

           sum = sum + c

      Emit(term t, count sum)

應用1:Log分析,數據查詢,整理歸類

問題陳述:有一系列條目,每個條目都有幾個屬性,要把具有同一屬性值的條目都保存在一個文件里,或者把條目按照屬性值分組, 最典型的應用是倒排索引。

  解決方案:在 Mapper 中以每個條目的所需屬性值作為key,其本身作為值傳遞給Reducer。Reducer取得按照屬性值分組的條目,然后可以處理或者保存。如果是在構建倒排索引,那么 每個條目相當於一個詞而屬性值就是詞所在的文檔ID。

應用2:倒排索引,ETL,過濾(文本查找),解析和校驗

       問題陳述:假設有很多條記錄,需要從其中找出滿足某個條件的所有記錄,或者將每條記錄轉換成另外一種形式(轉換操作相對於各條記錄獨立,即對一條記錄的操作與其他記錄無關)。像文本解析、特定值抽取、格式轉換等都屬於后一種用例。

  解決方案:在Mapper 里逐條進行操作,輸出需要的值或轉換后的形式。

  應用3:日志分析,數據查詢,ETL,數據校驗,分布式任務執行

  問題陳述:大型計算可以分解為多個部分分別進行然后合並各個計算的結果以獲得最終結果。

  解決方案: 將數據切分成多份作為每個 Mapper的輸入,每個Mapper處理一份數據,執行同樣的運算,產生結果,Reducer把多個Mapper的結果組合成一個。

  案例研究: 數字通信系統模擬,像 WiMAX 這樣的數字通信模擬軟件通過系統模型來傳輸大量的隨機數據,然后計算傳輸中的錯誤幾率。 每個Mapper 處理樣本1/N 的數據,計算出這部分數據的錯誤率,然后在Reducer 里計算平均錯誤率。

  應用4:工程模擬,數字分析,性能測試,排序

  問題陳述:有許多條記錄,需要按照某種規則將所有記錄排序或是按照順序來處理記錄。

解決方案:Mappers將待排序的屬性值為鍵,整條記錄為值輸出。 不過實際應用中的排序要更加巧妙一點, 這就是它之所以被稱為MapReduce核心的原因(“核心”是說排序?因為證明Hadoop計算能力的實驗是大數據排序?還是說Hadoop的處理過程中對key排序的環節?)。在實踐中,常用組合鍵來實現二次排序和分組。(MapReduce最初只能夠對鍵排序,但是也可以利用Hadoop的特性來實現按值排序。)

按照BigTable的概念,使用MapReduce來對最初數據而非中間數據排序,也即保持數據的有序狀態更有好處,必須注意這一點。換句話說,在數據插入時排序一次要比在每次查詢數據的時候排序更高效。

  應用5:ETL,數據分析,非基本MapReduce 模式,迭代消息傳遞 (圖處理)

  問題陳述:假設一個實體網絡,實體之間存在着關系。需要按照與它比鄰的其他實體的屬性計算出一個狀態。這個狀態可以表現為它和其它節點之間的距離,存在特定屬性的鄰接點的跡象,鄰域密度特征等等。

解決方案:網絡存儲為系列節點的結合,每個節點包含有其所有鄰接點ID的列表。按照這個概念,MapReduce迭代進行,每次迭代中每個節點都發消息給它的鄰接點。鄰接點根據接收到的信息更新自己的狀態。當滿足了某些條件的時候迭代停止,如達到了最大迭代次數(網絡半徑)或兩次連續的迭代幾乎沒有狀態改變。從技術上來看,Mapper以每個鄰接點的ID為鍵發出信息,所有的信息都會按照接受節點分組,reducer就能夠重算各節點的狀態然后更新那些狀態改變了的節點。下面展示了這個算法:

class Mapper

  method Map(id n, object N)

     Emit(id n, object N)

     for all id m in N.OutgoingRelations do

        Emit(id m, message getMessage(N))

class Reducer

  method Reduce(id m, [s1, s2,...])

     M = null

     messages = []

     for all s in [s1, s2,...] do

         if IsObject(s) then

             M = s

         else               // s is amessage

             messages.add(s)

     M.State = calculateState(messages)

     Emit(id m, item M)

一個節點的狀態可以迅速的沿着網絡傳全網,那些被感染了的節點又去感染它們的鄰居,整個過程就像下面的圖示一樣:


案例研究:沿分類樹的有效性傳遞

  問題陳述:這個問題來自於真實的電子商務應用。將各種貨物分類,這些類別可以組成一個樹形結構,比較大的分類(像男人、女人、兒童)可以再分出小分類(像男褲或女裝),直到不能再分為止(像男式藍色牛仔褲)。這些不能再分的基層類別可以是有效(這個類別包含有貨品)或者已無效的(沒有屬於這個分類的貨品)。如果一個分類至少含有一個有效的子分類那么認為這個分類也是有效的。我們需要在已知一些基層分類有效的情況下找出分類樹上所有有效的分類。

  解決方案:這個問題可以用上一節提到的框架來解決。我們在下面定義了名為 getMessage和calculateState 的方法:

class N

   State in {True = 2, False = 1, null = 0},

   initialized 1 or 2 for end-of-line categories, 0 otherwise

      method getMessage(object N)

          return N.State

      method calculateState(state s, data [d1,d2,...])

          return max( [d1, d2,...] )

案例研究:廣度優先搜索

  問題陳述:需要計算出一個圖結構中某一個節點到其它所有節點的距離。

  解決方案: Source源節點給所有鄰接點發出值為0的信號,鄰接點把收到的信號再轉發給自己的鄰接點,每轉發一次就對信號值加1:

class N

   State is distance,

   initialized 0 for source node, INFINITY for all other nodes

methodgetMessage(N)

          return N.State + 1

methodcalculateState(state s, data [d1, d2,...])

          min( [d1, d2,...] )

案例研究:網頁排名和 Mapper端數據聚合

  這個算法由Google提出,使用權威的PageRank算法,通過連接到一個網頁的其他網頁來計算網頁的相關性。真實算法是相當復雜的,但是核心思想是權重可以傳播,也即通過一個節點的各聯接節點的權重的均值來計算節點自身的權重。

class N

     State is PageRank

       methodgetMessage(object N)

         return N.State /N.OutgoingRelations.size()

     method calculateState(state s, data [d1,d2,...])

          return ( sum([d1, d2,...]) )

要指出的是上面用一個數值來作為評分實際上是一種簡化,在實際情況下,我們需要在Mapper端來進行聚合計算得出這個值。下面的代碼片段展示了這個改變后的邏輯(針對於PageRank 算法):

class Mapper

   method Initialize

      H = new AssociativeArray

   method Map(id n, object N)

      p = N.PageRank  /N.OutgoingRelations.size()

      Emit(id n, object N)

      for all id m in N.OutgoingRelations do

         H{m} = H{m} + p

   method Close

      for all id n in H do

         Emit(id n, value H{n})

class Reducer

   method Reduce(id m, [s1, s2,...])

      M = null

      p = 0

      for all s in [s1, s2,...] do

           if IsObject(s) then

              M = s

           else

              p = p + s

      M.PageRank = p

      Emit(id m, item M)

應用6:圖分析,網頁索引,值去重(對唯一項計數)

  問題陳述: 記錄包含值域F和值域G,要分別統計相同G值的記錄中不同的F值的數目(相當於按照G分組)這個問題可以推而廣之應用於分面搜索(某些電子商務網站稱之為NarrowSearch)

 Record 1: F=1, G={a, b}

 Record 2: F=2, G={a, d, e}

 Record 3: F=1, G={b}

 Record 4: F=3, G={a, b}

 Result:

 a -> 3 // F=1, F=2, F=3

 b -> 2 // F=1, F=3

 d -> 1 // F=2

 e -> 1 // F=2

解決方案 I:第一種方法是分兩個階段來解決這個問題。第一階段在Mapper中使用F和G組成一個復合值對,然后在Reducer中輸出每個值對,目的是為了保證F值的唯一性。在第二階段,再將值對按照G值來分組計算每組中的條目數。

第一階段:

class Mapper

   method Map(null, record [value f, categories [g1, g2,...]])

     for all category g in [g1, g2,...]

       Emit(record [g, f], count 1)

class Reducer

   method Reduce(record [g, f], counts [n1, n2, ...])

     Emit(record [g, f], null )

第二階段:

class Mapper

   method Map(record [f, g], null)

     Emit(value g, count 1)

class Reducer

   method Reduce(value g, counts [n1, n2,...])

     Emit(value g, sum( [n1, n2,...] ) )

解決方案 II:第二種方法只需要一次MapReduce即可實現,但擴展性不強。算法很簡單-Mapper輸出值和分類,在Reducer里為每個值對應的分類去重然后給每個所屬的分類計數加1,最后再在Reducer結束后將所有計數加和。這種方法適用於只有有限個分類,而且擁有相同F值的記錄不是很多的情況。例如網絡日志處理和用戶分類,用戶的總數很多,但是每個用戶的事件是有限的,以此分類得到的類別也是有限的。值得一提的是在這種模式下可以在數據傳輸到Reducer之前使用Combiner來去除分類的重復值。

class Mapper

methodMap(null, record [value f, categories [g1, g2,...] )

forall category g in [g1, g2,...]

Emit(valuef, category g)

class Reducer

methodInitialize

H= new AssociativeArray : category -> count

methodReduce(value f, categories [g1, g2,...])

[g1',g2',..] = ExcludeDuplicates( [g1, g2,..] )

forall category g in [g1', g2',...]

H{g}= H{g} + 1

methodClose

forall category g in H do

Emit(categoryg, count H{g})

應用7:日志分析,用戶計數,互相關

  問題陳述:有多個各由若干項構成的組,計算項兩兩共同出現於一個組中的次數。假如項數是N,那么應該計算N*N。這種情況常見於文本分析(條目是單詞而元組是句子),市場分析(購買了此物的客戶還可能購買什么)。如果N*N小到可以容納於一台機器的內存,實現起來就比較簡單了。

  配對法:第一種方法是在Mapper中給所有條目配對,然后在Reducer中將同一條目對的計數加和。但這種做法也有缺點:使用combiners 帶來的的好處有限,因為很可能所有項對都是唯一的,不能有效利用內存

class Mapper

methodMap(null, items [i1, i2,...] )

for allitem i in [i1, i2,...]

forall item j in [i1, i2,...]

Emit(pair[i j], count 1)

class Reducer

methodReduce(pair [i j], counts [c1, c2,...])

s= sum([c1, c2,...])

Emit(pair[ij], count s)

第二種方法是將數據按照pair中的第一項來分組,並維護一個關聯數組,數組中存儲的是所有關聯項的計數。Thesecond approach is to group data by the first item in pair and maintain anassociative array (“stripe”) where counters for all adjacent items areaccumulated. Reducer receives all stripes for leading item i, merges them, andemits the same result as in the Pairs approach.(中間結果的鍵數量相對較少,因此減少了排序消耗。可以有效利用combiners。可在內存中執行,不過如果沒有正確執行的話也會帶來問題,實現起來比較復雜。)

一般來說, “stripes” 比“pairs” 更快

class Mapper

methodMap(null, items [i1, i2,...] )

forall item i in [i1, i2,...]

H= new AssociativeArray : item -> counter

forall item j in [i1, i2,...]

H{j}= H{j} + 1

Emit(itemi, stripe H)

class Reducer

methodReduce(item i, stripes [H1, H2,...])

H= new AssociativeArray : item -> counter

H= merge-sum( [H1, H2,...] )

forall item j in H.keys()

Emit(pair[i j], H{j})

應用8:文本分析,市場分析,References:LinJ. Dyer C. Hirst G. Data Intensive Processing MapReduce,用MapReduce表達關系模式。

在這部分我們會討論一下怎么使用MapReduce來進行主要的關系操作(篩選(Selection)

class Mapper

methodMap(rowkey key, tuple t)

ift satisfies the predicate

Emit(tuplet, null)

投影(Projection):投影只比篩選稍微復雜一點,在這種情況下我們可以用Reducer來消除可能的重復值。

class Mapper

methodMap(rowkey key, tuple t)

tupleg = project(t) // extract required fields to tuple g

Emit(tupleg, null)

class Reducer

methodReduce(tuple t, array n) // n is an array of nulls

Emit(tuplet, null)

合並(Union):兩個數據集中的所有記錄都送入Mapper,在Reducer里消重。

class Mapper

methodMap(rowkey key, tuple t)

Emit(tuplet, null)

class Reducer

methodReduce(tuple t, array n) // n is an array of one or two nulls

(tuplet, null)

交集(Intersection):將兩個數據集中需要做交叉的記錄輸入Mapper,Reducer輸出出現了兩次的記錄。因為每條記錄都有一個主鍵,在每個數據集中只會出現一次,所以這樣做是可行的

class Mapper

methodMap(rowkey key, tuple t)

Emit(tuplet, null)

class Reducer

methodReduce(tuple t, array n) // n is an array of one or two nulls

ifn.size() = 2

Emit(tuplet, null)

差異(Difference):假設有兩個數據集R和S,我們要找出R與S的差異。Mapper將所有的元組做上標記,表明他們來自於R還是S,Reducer只輸出那些存在於R中而不在S中的記錄。

class Mapper

methodMap(rowkey key, tuple t)

Emit(tuplet, string t.SetName) // t.SetName is either 'R' or 'S'

class Reducer

methodReduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S','R']

ifn.size() = 1 and n[1] = 'R'

Emit(tuplet, null)

分組聚合(GroupByand Aggregation):分組聚合可以在如下的一個MapReduce中完成。Mapper抽取數據並將之分組聚合,Reducer中對收到的數據再次聚合。典型的聚合應用比如求和與最值可以以流的方式進行計算,因而不需要同時保有所有的值。但是另外一些情景就必須要兩階段MapReduce,前面提到過的惟一值模式就是一個這種類型的例子。

class Mapper

method Map(null,tuple [value GroupBy, value AggregateBy, value ...])

Emit(valueGroupBy, value AggregateBy)

class Reducer

methodReduce(value GroupBy, [v1, v2,...])

Emit(valueGroupBy, aggregate( [v1, v2,...] ) ) // aggregate() : sum(), max(),...

連接(Joining):MapperReduce框架可以很好地處理連接,不過在面對不同的數據量和處理效率要求的時候還是有一些技巧。

       分配后連接(Reduce端連接,排序-合並連接):這個算法按照鍵K來連接數據集R和L。Mapper遍歷R和L中的所有元組,以K為鍵輸出每一個標記了來自於R還是L的元組,Reducer把同一個K的數據分裝入兩個容器(R和L),然后嵌套循環遍歷兩個容器中的數據以得到交集,最后輸出的每一條結果都包含了R中的數據、L中的數據和K。這種方法有以下缺點:Mapper要輸出所有的數據,即使一些key只會在一個集合中出現;Reducer要在內存中保有一個key的所有數據,如果數據量打過了內存,那么就要緩存到硬盤上,這就增加了硬盤IO的消耗。盡管如此,再分配連接方式仍然是最通用的方法,特別是其他優化技術都不適用的時候。

class Mapper

methodMap(null, tuple [join_key k, value v1, value v2,...])

Emit(join_keyk, tagged_tuple [set_name tag, values [v1, v2, ...] ] )

class Reducer

methodReduce(join_key k, tagged_tuples [t1, t2,...])

H= new AssociativeArray : set_name -> values

forall tagged_tuple t in [t1, t2,...] // separate values into 2 arrays

H{t.tag}.add(t.values)

forall values r in H{'R'} // produce a cross-join of the two arrays

forall values l in H{'L'}

Emit(null,[k r l] )

復制鏈接Replicated Join (Mapper端連接,Hash 連接):在實際應用中,將一個小數據集和一個大數據集連接是很常見的(如用戶與日志記錄)。假定要連接兩個集合R和L,其中R相對較小,這樣,可以把R分發給所有的Mapper,每個Mapper都可以載入它並以連接鍵來索引其中的數據,最常用和有效的索引技術就是哈希表。之后,Mapper遍歷L,並將其與存儲在哈希表中的R中的相應記錄連接,。這種方法非常高效,因為不需要對L中的數據排序,也不需要通過網絡傳送L中的數據,但是R必須足夠小到能夠分發給所有的Mapper。

class Mapper

methodInitialize

H= new AssociativeArray : join_key -> tuple from R

R= loadR()

forall [ join_key k, tuple [r1, r2,...] ] in R

H{k}= H{k}.append( [r1, r2,...] )

methodMap(join_key k, tuple l)

forall tuple r in H{k}

Emit(null,tuple [k r l] ) 



注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com