Flume 入門與簡單運用


一、Flume 簡述

  • Flume是什么:通俗地說 Flume 就是一個日志采集工具。
  • 版本進化過程:分為 Flume-og(0.9x 已停止更新了)、Flume-ng(1.x) 兩個版本,Flume-ng最明顯的改動就是取消了集中管理配置的 Master 和 Zookeeper,變為一個純粹的傳輸工具。Flume-ng另一個主要的不同點是讀入數據和寫出數據現在由不同的工作線程處理(稱為 Runner)。在 Flume-og 中,讀入線程同樣做寫出工作(除了故障重試)。如果寫出慢的話(不是完全失敗),它將阻塞 Flume 接收數據的能力。
  • 數據處理方面: Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力 。提供了從console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系統),支持TCP和UDP等2種模式),exec(命令執行)等數據源上收集數據的能力。

二、Flume 架構

典型架構力如下:
這里寫圖片描述

  • 各組件職責:

    • Source : 負責日志流入,比如從文件、網絡、Kafka等數據源流入數據,數據流入的方式有兩種:輪訓拉取和事件驅動。
    • Channel :負責數據聚合或暫存,比如暫存到內存、本地文件、數據庫、Kafka 等,日志數據不會在管道停留很長時間,委快會被 Sink 消費掉。
      Sink :也叫接收器,負責數據轉移存儲,比如從Channel拿到日志后直接存儲到HDFS、Hbase、ElasticSearch、Kafka 等。

    • 細分 Flume 數據流應該是由5個組件組成:Events、Sources、Channels、Sink、Agent。基中三個如上所述,Events與Agent 如下:
      Events :是使用Flume移動的數據的基本單位。它類似於JMS中的消息,通常很小。它由頭和字節數組體組成。

三、 第一個Flume Demo

  • 原理方面的這里暫時不深入,先運行一個簡單的Demo再說…
    其實flume的用法很簡單—-書寫一個配置文件,在配置文件當中描述source、channel與sink的具體實現,而后運行一個agent實例,在運行agent實例的過程中會讀取配置文件的內容,這樣flume就會采集到數據。

  • 筆者是在HDP平台上安裝的Flume, 因此無需進行任何配置便可以運用Flume。
    部署與安裝步驟只是通過 Ambari 的web 界面,點擊 Actions , 選擇 add services ,根據提示直接Next便安裝完成。

  • 查看flume 是否安裝好 及其版本號
    [root@hdp06 conf]# flume-ng version
    Flume 1.5.2.2.5.3.0-37

  • 這個案例主要的功能是監聽一個指定的網絡端口,即只要應用程序向這個端口里面寫數據,這個source組件就可以獲取到信息並打印到日志中,當然如果想將監控的日志寫入到Hdfs也是很簡單的,只需修改對應的配置便可,這里以快速入門為由所以只把其記錄下。

詳細操作步驟如下所示:

1、編寫配置文件

  注:這里的配置文件指的是一個Flume任務相關的配置文件,例如這里監聽一個指定網絡端口的配置,並非安裝相關的配置文件。

創建一個配置文件(文件可以隨便找個目錄放置):

touch testNetcat.conf

2、編寫配置文件:

vi  testNetcat.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.135.21.3
a1.sources.r1.port = 55555

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、啟動 flume agent a1 服務端

[root@hdp03 conf]# flume-ng agent -n a1 -c ../conf -f testNetcat.conf -Dflume.root.logger=DEBUG,console

如果要想flume 命令在后台執行,可在上面命令最后添加一個“&”符號,便可在后台運行了,實際開發過程,一般通過編寫啟停腳本來運行。

啟動程序的部分日志如下:

17/08/25 15:07:24 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
17/08/25 15:07:24 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
17/08/25 15:07:24 INFO node.Application: Starting Sink k1
17/08/25 15:07:24 INFO node.Application: Starting Source r1
17/08/25 15:07:24 INFO source.NetcatSource: Source starting
17/08/25 15:07:24 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/10.135.21.3:55555]

從上面日志可看出其在監聽 10.135.21.3:55555

4、通過另一台Linux 服務器向10.135.21.3:55555 端口發送信息

[root@hdp04 root]# telnet hdp03 55555
Trying 10.194.67.6...
Connected to hdp06.
Escape character is '^]'.
I am bad boy...
OK
戰狼2很好看哦
OK

可在 10.135.21.3 監聽服務的日志中看到如下信息

17/08/25 15:07:24 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/10.194.67.6:55555]
17/08/25 15:16:43 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 21 21 21 21 21 Hello Flume!!!!! }
17/08/25 15:20:02 INFO sink.LoggerSink: Event: { headers:{} body: 49 20 61 6D 20 62 61 64 20 62 6F 79 2E 2E 2E 0D I am bad boy.... }
17/08/25 15:20:40 INFO sink.LoggerSink: Event: { headers:{} body: E6 88 98 E7 8B BC 32 E5 BE 88 E5 A5 BD E7 9C 8B ......2......... }

上述就是這個簡單案例的全部過程。

5、配置文件簡單解說

a1.sources = r1
a1.sinks = k1
a1.channels = c1

用編程思路來解釋上面三行代碼就相當與定義了 sources、sinks、channels 對應的變量,方便下面對它們的引用。其中 a1為agent 名,可隨意命令,但要注意在啟動時要與之對應,eg: flume-ng agent -n a1 …

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.135.21.3
a1.sources.r1.port = 55555

設置了r1(即sources) 接收的資源的類型為netcat, 監聽地址為10.135.21.3 ,端口為55555

# Describe the sink
a1.sinks.k1.type = logger

指定了k1(sinks) 輸出日志的形式

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

設置了c1 (channel) 的緩存機制,memory 將日志緩存在內存,
capacity:默認該通道中最大的可以存儲的event數量是1000,
trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面代碼就很明了,將sources 與 sinks 綁定 channel

官方文檔如下:
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_flume-component-guide/content/understanding_flume.html

flume 寫的不錯的入門文檔
Flume架構以及應用介紹


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com