典型架構力如下:
各組件職責:
Channel :負責數據聚合或暫存,比如暫存到內存、本地文件、數據庫、Kafka 等,日志數據不會在管道停留很長時間,委快會被 Sink 消費掉。
Sink :也叫接收器,負責數據轉移存儲,比如從Channel拿到日志后直接存儲到HDFS、Hbase、ElasticSearch、Kafka 等。
細分 Flume 數據流應該是由5個組件組成:Events、Sources、Channels、Sink、Agent。基中三個如上所述,Events與Agent 如下:
Events :是使用Flume移動的數據的基本單位。它類似於JMS中的消息,通常很小。它由頭和字節數組體組成。
原理方面的這里暫時不深入,先運行一個簡單的Demo再說…
其實flume的用法很簡單—-書寫一個配置文件,在配置文件當中描述source、channel與sink的具體實現,而后運行一個agent實例,在運行agent實例的過程中會讀取配置文件的內容,這樣flume就會采集到數據。
筆者是在HDP平台上安裝的Flume, 因此無需進行任何配置便可以運用Flume。
部署與安裝步驟只是通過 Ambari 的web 界面,點擊 Actions , 選擇 add services ,根據提示直接Next便安裝完成。
查看flume 是否安裝好 及其版本號
[root@hdp06 conf]# flume-ng version
Flume 1.5.2.2.5.3.0-37
這個案例主要的功能是監聽一個指定的網絡端口,即只要應用程序向這個端口里面寫數據,這個source組件就可以獲取到信息並打印到日志中,當然如果想將監控的日志寫入到Hdfs也是很簡單的,只需修改對應的配置便可,這里以快速入門為由所以只把其記錄下。
詳細操作步驟如下所示:
1、編寫配置文件
注:這里的配置文件指的是一個Flume任務相關的配置文件,例如這里監聽一個指定網絡端口的配置,並非安裝相關的配置文件。
創建一個配置文件(文件可以隨便找個目錄放置):
touch testNetcat.conf
2、編寫配置文件:
vi testNetcat.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.135.21.3
a1.sources.r1.port = 55555
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、啟動 flume agent a1 服務端
[root@hdp03 conf]# flume-ng agent -n a1 -c ../conf -f testNetcat.conf -Dflume.root.logger=DEBUG,console
如果要想flume 命令在后台執行,可在上面命令最后添加一個“&”符號,便可在后台運行了,實際開發過程,一般通過編寫啟停腳本來運行。
啟動程序的部分日志如下:
17/08/25 15:07:24 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
17/08/25 15:07:24 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
17/08/25 15:07:24 INFO node.Application: Starting Sink k1
17/08/25 15:07:24 INFO node.Application: Starting Source r1
17/08/25 15:07:24 INFO source.NetcatSource: Source starting
17/08/25 15:07:24 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/10.135.21.3:55555]
從上面日志可看出其在監聽 10.135.21.3:55555
4、通過另一台Linux 服務器向10.135.21.3:55555 端口發送信息
[root@hdp04 root]# telnet hdp03 55555
Trying 10.194.67.6...
Connected to hdp06.
Escape character is '^]'.
I am bad boy...
OK
戰狼2很好看哦
OK
可在 10.135.21.3 監聽服務的日志中看到如下信息
17/08/25 15:07:24 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/10.194.67.6:55555]
17/08/25 15:16:43 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 21 21 21 21 21 Hello Flume!!!!! }
17/08/25 15:20:02 INFO sink.LoggerSink: Event: { headers:{} body: 49 20 61 6D 20 62 61 64 20 62 6F 79 2E 2E 2E 0D I am bad boy.... }
17/08/25 15:20:40 INFO sink.LoggerSink: Event: { headers:{} body: E6 88 98 E7 8B BC 32 E5 BE 88 E5 A5 BD E7 9C 8B ......2......... }
上述就是這個簡單案例的全部過程。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
用編程思路來解釋上面三行代碼就相當與定義了 sources、sinks、channels 對應的變量,方便下面對它們的引用。其中 a1為agent 名,可隨意命令,但要注意在啟動時要與之對應,eg: flume-ng agent -n a1 …
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.135.21.3
a1.sources.r1.port = 55555
設置了r1(即sources) 接收的資源的類型為netcat, 監聽地址為10.135.21.3 ,端口為55555
# Describe the sink
a1.sinks.k1.type = logger
指定了k1(sinks) 輸出日志的形式
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
設置了c1 (channel) 的緩存機制,memory 將日志緩存在內存,
capacity:默認該通道中最大的可以存儲的event數量是1000,
trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
上面代碼就很明了,將sources 與 sinks 綁定 channel
flume 寫的不錯的入門文檔
Flume架構以及應用介紹
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。