本期導讀 :【OSS 訪問加速】第七講
主題:Flink 高效 sink 寫入 OSS
講師:重湖,阿里巴巴計算平臺事業部 EMR 高級工程師
內容框架:
- 背景介紹
- 功能介紹
- 如何配置
- 如何使用
直播回放鏈接:(7/8講)
https://developer.aliyun.com/live/246851
一、背景介紹
Apache Flink 簡介
Apache Flink 是新一代大數據計算引擎的代表,以分佈式流計算為核心,同時支持批處理。特點:
- 低延時:Flink 流式計算可以做到亞秒甚至毫秒級延時,相比之下 Spark 流計算很難達到秒級
- 高吞吐:Flink 以分佈式快照算法實現容錯,對吞吐量的影響很小
- 高容錯:基於分佈式快照算法,Flink 實現了低代價、高效的容錯表現,以及 Exactly_Once 語義保證
JindoFS Flink Connector 產生背景
阿里雲對象存儲 Object Storage Service(OSS):
- 海量:無限容量,彈性伸縮
- 安全:12個9的數據安全性,多種加密方式
- 低成本:遠低於雲磁盤,且有多種存儲方式、生命週期管理等節約成本
- 高可靠:服務可用性 99.9%
- 已服務於海量用戶
Flink 應用廣泛:
- 流計算領域業內主要解決方案
- Apache 基金會最活躍項目之一
- 未來:流批一體、在線分析
Flink 使用痛點:
- 開源 ApacheFlink 尚不支持直接寫入 OSS
- Hadoop OSS SDK 寫入性能不一定滿足需求
JindoFS Flink Connector 介紹
整體架構:
兩階段 Checkpoint (檢查點) 機制:
- 第一階段 MPU (MultiPartUpload,分片上傳) 寫入 OSS
- 第二階段 MPU 提交
Recoverable Writer 可恢復性寫入:
- 臨時文件以普通文件格式上傳 OSS
- Sink 節點狀態快照
寫入 OSS vs. 寫入 亞馬遜S3:
- Native 實現:數據寫入以 C++ 代碼實現,相比 Java 更高效
- 高速讀寫:多線程讀寫臨時文件,對大於1MB的文件優勢尤其明顯
- 數據緩存:讀寫 OSS 實現本地緩存,加速外部訪問
OSS 訪問加速,JindoFS 提供新支持
二、如何配置
如何配置 JindoFS Flink Connector
環境要求:
- 集群上有開源版本 Flink 軟件,版本不低於1.10.1
SDK 配置:
下載所需 SDK 文件:
- jindo-flink-sink-${version}.jar
- jindofs-sdk-${version}.jar
- 下載鏈接⭐ (Github):
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_download.md
將兩個 jar 放置於集群 Flink 目錄下 lib 文件夾:
- Flink 根目錄通常可由 $FLINK_HOME 環境變量獲取
- 集群所有節點均需配置
Java SPI:自動加載資源,無需額外配置
⭐文檔鏈接(Github):
在程序中使用 JindoFS Flink Connector
確保集群能夠訪問 OSS Bucket
- 前提:已購買 OSS 產品,OSS 網站鏈接:
https://www.aliyun.com/product/oss - 確保能夠訪問 OSS Bucket,例如正確配置密鑰或免密服務等
使用合適的路徑,流式寫入OSS Bucket
- 寫入 OSS 須使用 oss:// 前綴路徑,類似於:
oss://<user-bucket>/<user-defined-sink-dir>
更多優化!用 JindoFS SDK 加速 OSS 訪問,參考
⭐Github:
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_vs_hadoop_sdk.md
在程序中使用 JindoFS Flink Connector:Java
在程序中開啟 Flink Checkpoint
- 前提:使用可重發的數據源,如 Kafka
- 通過 StreamExecutionEnvironment 對象打開 Checkpoint(示例):
建立:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
打開:
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
示例程序
- 下文中,outputStream 是一個預先形成的 DataStream 對象,若需寫入 OSS,則可以這樣添加 sink:
String outputPath = "oss://<user-bucket>/<user-defined-sink-dir>"; StreamingFileSink<String> sink= StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);
- 上述程序指定將 outputStream 中的String 內容寫入 OSS 路徑 oss:///,最後還需用 env.execute() 語句執行 Flink 作業,env 是已建立的 StreamExecutionEnvironment 對象
- 最後,將 Java 作業打包為 jar 文件,並用 flink run 在集群提交即可
在程序中使用 JindoFS Flink Connector:Pyflink
與Java 示例類似,在 Pyflink 中使用 JindoFS Flink Connector 與寫入 HDFS 等其他介質方式相同,只需:
- 將寫入路徑寫作合適的 OSS 路徑
- 注意打開 Checkpoint 功能
例如,下列 Python 程序定義了一張位於 OSS 的表:
sink_dest = "oss://<user-bucket>/<user-defined-sink-dir>" sink_ddl = f""" CREATE TABLE mySink ( uid INT, pid INT ) PARTITIONED BY ( pid ) WITH ( 'connector' = 'filesystem', 'fpath' = '{sink_dest}', 'format' = 'csv', 'sink.rolling-policy.file-size' = '2MB', 'sink.partition-commit.policy.kind' = 'success-file' ) """
然後將其添加到 StreamTableEnvironmentt_env 中即可:t_env.sql_update(sink_ddl)
在程序中使用 JindoFS Flink Connector:更多配置
用戶通過 flink run 提交 java 或 pyflink 程序時,可以額外自定義一些參數,格式:
flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
目前支持“熵注入”及“分片上傳並行度”兩項配置
熵注入(entropyinjection):
- 功能:將寫入路徑的一段特定字符串匹配出來,用一段隨機的字符串進行替換
- 效果:削弱所謂 “片區” (sharding) 效應,提高寫入效率
- 配置參數:
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>
分片上傳並行度
- 配置參數:oss.upload.max.concurrent.uploads
- 默認值:當前可用的處理器數量
直接觀看第四課(7/8講)視頻回放,獲取實例講解~
https://developer.aliyun.com/live/246851
⭐Github鏈接:
https://github.com/aliyun/alibabacloud-jindofs
不錯過每次直播信息、探討更多數據湖 JindoFS+OSS 相關技術問題,歡迎掃碼加入釘釘交流群!