本期導讀 :【OSS 訪問加速】第八講
主題:Flume 高效寫入 OSS
講師:焱冰,阿里巴巴計算平臺事業部 EMR 技術專家
內容框架:
- Flume 簡介
- Flume 常用組件
- Flume 使用 JindoFS SDK
- Flume 實戰 JindoFS SDK
直播回放鏈接:(7/8講)
https://developer.aliyun.com/live/246851
一、Flume 簡介
Apache Flume 簡介
- Apache Flume 是 Apache 基金會的一個頂級項目,以下簡稱 Flume。
- Flume 是一個分佈式、可靠、高可用的系統,支持從不同數據源高效地收集、聚合、遷移大量日誌數據,聚合到中心化的數據存儲服務。
- Flume 使用最多的場景是日誌收集,也可以通過定製 Source 來傳輸其他不同類型的數據。
- E-MapReduce 從 3.16.0 版本開始支持 Apache Flume。
Flume 中的概念及術語
一個 Flume Agent 由 Source、Channel、Sink 組成。
Event
- 數據流通過 Flume Agent 的基本單位。
- Event 由一個裝載字節數組負載(Payload)和一個可選的字符串屬性集合組成。
Source
- 數據源收集器,從外部數據源收集數據,併發送到 Channel。
Channel
- Source 和 Sink 之間的緩衝隊列。
Sink
- 從 Channel 中獲取 Event ,並將以事務的形式 commit 到外部存儲中。一旦事務 commit 成功,該 Event 會從 Channel 中移除。
二、Flume 常用組件
常用組件介紹
常見 Source
- Avro Source:通過監聽 Avro 端口獲取 Avro Client 發送的事件。Avro 是 Hadoop 提供的一種協議,用於序列化反序列化數據。
- Exec Source:通過監聽命令行輸出獲取數據,如 tail -f /var/log/messages。
- NetCat TCP Source: 監聽指定 tcp 端口獲取數據。類似的還有 Netcat UDP Source。
- Taildir Source: 監控目錄下的多個文件,會記錄偏移量,不會丟失數據,最為常用。
常見 Channel
- Memory Channel: 緩存到內存中,性能高,最為常用。
- File Channel: 緩存到文件中,會記錄 checkpoint 和 data 文件,可靠性高,但性能較差。
- JDBC Channel: 緩存到關係型數據庫中。
- Kakfa Channel:通過 Kafka 來緩存數據。
常見 Sink
- Logger Sink: 用於測試
- Avro Sink: 轉換成 Avro Event,主要用於連接多個 Flume Agent。
- HDFS Sink: 寫入 HDFS,最為常用。
- Hive sink: 寫入 Hive 表或分區,使用 Hive 事務寫 events。
- Kafka sink: 寫入 Kafka。
文檔
- 官方文檔:
https://flume.apache.org/documentation.html - 中文文檔:
https://flume.liyifeng.org/
三、Flume 使用 JindoFS SDK
Flume 使用 JindoFS SDK 寫入 OSS
環境要求
在集群上已經部署 Flume,已部署 JindoSDK 3.4 以上版本。
為什麼需要使用 JindoFS SDK 寫入 OSS
Flume 通過 flush() 調用保證事務性寫入,OSS 本身不支持 Flush 功能,通過 JindoFS SDK 寫入 OSS,雖然不能讓 flush 後的數據立刻可見,但是可以保證 flush() 後的數據不丟失,Flume 作業失敗後,可以使用 JindoFS 命令恢復 flush 過的數據。
配置示例
xxx.sinks.oss_sink.hdfs.path = oss://${your_bucket}/flume_dir/%Y-%m-%d/%H xxx.sinks.oss_sink.hdfs.batchSize = 100000 xxx.sinks.oss_sink.hdfs.round = true xxx.sinks.oss_sink.hdfs.roundValue = 15 xxx.sinks.oss_sink.hdfs.Unit = minute xxx.sinks.oss_sink.hdfs.filePrefix = your_topic xxx.sinks.oss_sink.rollSize = 3600 xxx.sinks.oss_sink.threadsPoolSize = 30
- 文檔鏈接⭐
在 EMR 集群內對 Flush 文件恢復
jindo jfs -recover [-R] [-flushStagingPath {flushStagingPath}] [-accessKeyId ${accessKeyId}] [-accessKeySecret ${accessKeySecret}] <path>
注:如需遞歸恢復(-R),建議先停止 Flume 任務,避免 Flume 任務運行異常。
在 EMR 集群外對 Flush 文件恢復
JindoOssFileSystem jindoFileSystem = (JindoOssFileSystem) fs; boolean isFolder = true; jindoFileSystem.recover(path, isFolder);
- 文檔鏈接⭐
四、Flume 實戰 JindoFS SDK
自建Flume 使用 JindoFS SDK 壓縮寫入 OSS
環境準備
Hadoop-2.8.5
下載
Flume-1.9.0:wgethttps://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
添加依賴
cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib cp commons-configuration-1.6.jar $FLUME_HOME/lib cp hadoop-auth-2.8.5.jar $FLUME_HOME/lib cp hadoop-common-2.8.5.jar $FLUME_HOME/lib cp hadoop-hdfs-2.8.5.jar $FLUME_HOME/lib cp commons-io-2.4.jar $FLUME_HOME/lib cp htrace-core4-4.0.1-incubating.jar $FLUME_HOME/lib
wget https://smartdata-binary.oss-cn-shanghai.aliyuncs.com/jindofs-sdk-3.5.0.jar -O $FLUME_HOME/lib/jindofs-sdk-3.5.0.jar
配置 JindoFS SDK
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_how_to_hadoop.md
配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/test.log
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 20
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = oss://yanbin-hd2-test/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = test
a1.sinks.k1.hdfs.batchSize = 20
a1.sinks.k1.hdfs.codeC = gzip
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.rollCount = 20
a2.sinks.k1.hdfs.minBlockReplicas = 1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
日誌仿真
while true; do echo `date` >> /tmp/test.log; sleep 1; done
Flume 啟動
bin/flume-ng agent --name a1 -c conf -f conf/flume-exec-oss.conf -Dflume.root.logger=INFO,console
結果
直接觀看第四課(7/8講)視頻回放,獲取實例講解~
https://developer.aliyun.com/live/246851
⭐Github鏈接:
https://github.com/aliyun/alibabacloud-jindofs
不錯過每次直播信息、探討更多數據湖 JindoFS+OSS 相關技術問題,歡迎掃碼加入釘釘交流群!