開發與維運

數據湖實操講解【OSS 訪問加速】第八講:Flume 高效寫入 OSS

本期導讀 :【OSS 訪問加速】第八講

主題:Flume 高效寫入 OSS

講師:焱冰,阿里巴巴計算平臺事業部 EMR 技術專家

內容框架:

  • Flume 簡介
  • Flume 常用組件
  • Flume 使用 JindoFS SDK
  • Flume 實戰 JindoFS SDK

直播回放鏈接:(7/8講)

https://developer.aliyun.com/live/246851

一、Flume 簡介

Apache Flume 簡介

  • Apache Flume 是 Apache 基金會的一個頂級項目,以下簡稱 Flume。
  • Flume 是一個分佈式、可靠、高可用的系統,支持從不同數據源高效地收集、聚合、遷移大量日誌數據,聚合到中心化的數據存儲服務。
  • Flume 使用最多的場景是日誌收集,也可以通過定製 Source 來傳輸其他不同類型的數據。
  • E-MapReduce 從 3.16.0 版本開始支持 Apache Flume。

image.png

Flume 中的概念及術語

image.png

一個 Flume Agent 由 Source、Channel、Sink 組成。

Event

  • 數據流通過 Flume Agent 的基本單位。
  • Event 由一個裝載字節數組負載(Payload)和一個可選的字符串屬性集合組成。

image.png

Source

  • 數據源收集器,從外部數據源收集數據,併發送到 Channel。

Channel

  • Source 和 Sink 之間的緩衝隊列。

Sink

  • 從 Channel 中獲取 Event ,並將以事務的形式 commit 到外部存儲中。一旦事務 commit 成功,該 Event 會從 Channel 中移除。

二、Flume 常用組件

常用組件介紹

常見 Source

  • Avro Source:通過監聽 Avro 端口獲取 Avro Client 發送的事件。Avro 是 Hadoop 提供的一種協議,用於序列化反序列化數據。
  • Exec Source:通過監聽命令行輸出獲取數據,如 tail -f /var/log/messages。
  • NetCat TCP Source: 監聽指定 tcp 端口獲取數據。類似的還有 Netcat UDP Source。
  • Taildir Source: 監控目錄下的多個文件,會記錄偏移量,不會丟失數據,最為常用。


常見 Channel

  • Memory Channel: 緩存到內存中,性能高,最為常用。
  • File Channel: 緩存到文件中,會記錄 checkpoint 和 data 文件,可靠性高,但性能較差。
  • JDBC Channel: 緩存到關係型數據庫中。
  • Kakfa Channel:通過 Kafka 來緩存數據。


常見 Sink

  • Logger Sink: 用於測試
  • Avro Sink: 轉換成 Avro Event,主要用於連接多個 Flume Agent。
  • HDFS Sink: 寫入 HDFS,最為常用。
  • Hive sink: 寫入 Hive 表或分區,使用 Hive 事務寫 events。
  • Kafka sink: 寫入 Kafka。


文檔

  • 官方文檔:
    https://flume.apache.org/documentation.html
  • 中文文檔:
    https://flume.liyifeng.org/

三、Flume 使用 JindoFS SDK

Flume 使用 JindoFS SDK 寫入 OSS

環境要求

在集群上已經部署 Flume,已部署 JindoSDK 3.4 以上版本。

為什麼需要使用 JindoFS SDK 寫入 OSS

Flume 通過 flush() 調用保證事務性寫入,OSS 本身不支持 Flush 功能,通過 JindoFS SDK 寫入 OSS,雖然不能讓 flush 後的數據立刻可見,但是可以保證 flush() 後的數據不丟失,Flume 作業失敗後,可以使用 JindoFS 命令恢復 flush 過的數據。

配置示例

xxx.sinks.oss_sink.hdfs.path = oss://${your_bucket}/flume_dir/%Y-%m-%d/%H 
xxx.sinks.oss_sink.hdfs.batchSize = 100000 
xxx.sinks.oss_sink.hdfs.round = true
xxx.sinks.oss_sink.hdfs.roundValue = 15
xxx.sinks.oss_sink.hdfs.Unit = minute
xxx.sinks.oss_sink.hdfs.filePrefix = your_topic
xxx.sinks.oss_sink.rollSize = 3600
xxx.sinks.oss_sink.threadsPoolSize = 30

- 文檔鏈接⭐

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flume/jindofs_sdk_on_flume_for_oss.md

在 EMR 集群內對 Flush 文件恢復

jindo jfs -recover [-R]
                   [-flushStagingPath {flushStagingPath}]
                   [-accessKeyId ${accessKeyId}]
                   [-accessKeySecret ${accessKeySecret}]
                   <path>

注:如需遞歸恢復(-R),建議先停止 Flume 任務,避免 Flume 任務運行異常。

在 EMR 集群外對 Flush 文件恢復

JindoOssFileSystem jindoFileSystem = (JindoOssFileSystem) fs; 
boolean isFolder = true; 
jindoFileSystem.recover(path, isFolder);

- 文檔鏈接

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flume/jindofs_sdk_on_flume_for_oss.md

四、Flume 實戰 JindoFS SDK

自建Flume 使用 JindoFS SDK 壓縮寫入 OSS

環境準備

Hadoop-2.8.5

下載

F​lume-1.9.0:wgethttps://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

添加依賴

cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib
cp commons-configuration-1.6.jar $FLUME_HOME/lib
cp hadoop-auth-2.8.5.jar $FLUME_HOME/lib
cp hadoop-common-2.8.5.jar $FLUME_HOME/lib
cp hadoop-hdfs-2.8.5.jar $FLUME_HOME/lib
cp commons-io-2.4.jar $FLUME_HOME/lib
cp htrace-core4-4.0.1-incubating.jar $FLUME_HOME/lib
wget https://smartdata-binary.oss-cn-shanghai.aliyuncs.com/jindofs-sdk-3.5.0.jar -O 
$FLUME_HOME/lib/jindofs-sdk-3.5.0.jar

配置 JindoFS SDK

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_how_to_hadoop.md

配置

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /tmp/test.log

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 20

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = oss://yanbin-hd2-test/%Y-%m-%d/%H

a1.sinks.k1.hdfs.filePrefix = test

a1.sinks.k1.hdfs.batchSize = 20

a1.sinks.k1.hdfs.codeC = gzip

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.rollCount = 20

a2.sinks.k1.hdfs.minBlockReplicas = 1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

日誌仿真

while true; do echo `date` >> /tmp/test.log; sleep 1; done

Flume 啟動

bin/flume-ng agent --name a1 -c conf -f conf/flume-exec-oss.conf  -Dflume.root.logger=INFO,console

結果

image.png

直接觀看第四課(7/8講)視頻回放,獲取實例講解~

https://developer.aliyun.com/live/246851



Github鏈接:

https://github.com/aliyun/alibabacloud-jindofs

不錯過每次直播信息、探討更多數據湖 JindoFS+OSS 相關技術問題,歡迎掃碼加入釘釘交流群!

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *