開發與維運

數據湖實操講解【OSS 訪問加速】第七講:Flink 高效 sink 寫入 OSS

本期導讀 :【OSS 訪問加速】第七講

主題:Flink 高效 sink 寫入 OSS

講師:重湖,阿里巴巴計算平臺事業部 EMR 高級工程師

內容框架:

  • 背景介紹
  • 功能介紹
  • 如何配置
  • 如何使用

直播回放鏈接:(7/8講)

https://developer.aliyun.com/live/246851

一、背景介紹


Apache Flink 簡介

Apache Flink 是新一代大數據計算引擎的代表,以分佈式流計算為核心,同時支持批處理。特點:

  • 低延時:Flink 流式計算可以做到亞秒甚至毫秒級延時,相比之下 Spark 流計算很難達到秒級
  • 高吞吐:Flink 以分佈式快照算法實現容錯,對吞吐量的影響很小
  • 高容錯:基於分佈式快照算法,Flink 實現了低代價、高效的容錯表現,以及 Exactly_Once 語義保證

image.png

JindoFS Flink Connector 產生背景

阿里雲對象存儲 Object Storage Service(OSS):

  • 海量:無限容量,彈性伸縮
  • 安全:12個9的數據安全性,多種加密方式
  • 低成本:遠低於雲磁盤,且有多種存儲方式、生命週期管理等節約成本
  • 高可靠:服務可用性 99.9%
  • 已服務於海量用戶

Flink 應用廣泛:

  • 流計算領域業內主要解決方案
  • Apache 基金會最活躍項目之一
  • 未來:流批一體、在線分析

Flink 使用痛點:

  • 開源 ApacheFlink 尚不支持直接寫入 OSS
  • Hadoop OSS SDK 寫入性能不一定滿足需求

JindoFS Flink Connector 介紹

整體架構:

兩階段 Checkpoint (檢查點) 機制:

  • 第一階段 MPU (MultiPartUpload,分片上傳) 寫入 OSS
  • 第二階段 MPU 提交

Recoverable Writer 可恢復性寫入:

  • 臨時文件以普通文件格式上傳 OSS
  • Sink 節點狀態快照

image.png

寫入 OSS vs.  寫入 亞馬遜S3:

  • Native 實現:數據寫入以 C++ 代碼實現,相比 Java 更高效
  • 高速讀寫:多線程讀寫臨時文件,對大於1MB的文件優勢尤其明顯
  • 數據緩存:讀寫 OSS 實現本地緩存,加速外部訪問

OSS 訪問加速,JindoFS 提供新支持

image.png

二、如何配置

如何配置 JindoFS Flink Connector

環境要求:

  • 集群上有開源版本 Flink 軟件,版本不低於1.10.1

SDK 配置:

下載所需 SDK 文件:


將兩個 jar 放置於集群 Flink 目錄下 lib 文件夾:

  • Flink 根目錄通常可由 $FLINK_HOME 環境變量獲取
  • 集群所有節點均需配置

Java SPI:自動加載資源,無需額外配置

⭐文檔鏈接(Github):

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flink/jindofs_sdk_on_flink_for_oss.md

在程序中使用 JindoFS Flink Connector

確保集群能夠訪問 OSS Bucket

使用合適的路徑,流式寫入OSS Bucket

  • 寫入 OSS 須使用 oss:// 前綴路徑,類似於:

oss://<user-bucket>/<user-defined-sink-dir>

更多優化!用 JindoFS SDK 加速 OSS 訪問,參考

⭐Github:

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_vs_hadoop_sdk.md

在程序中使用 JindoFS Flink Connector:Java

在程序中開啟 Flink Checkpoint

  • 前提:使用可重發的數據源,如 Kafka

  • 通過 StreamExecutionEnvironment 對象打開 Checkpoint(示例):

建立:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

打開:

env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);

示例程序

  • 下文中,outputStream 是一個預先形成的 DataStream 對象,若需寫入 OSS,則可以這樣添加 sink:
String outputPath = "oss://<user-bucket>/<user-defined-sink-dir>";
StreamingFileSink<String> sink= StreamingFileSink.forRowFormat(
        new Path(outputPath),
        new SimpleStringEncoder<String>("UTF-8")
).build();
outputStream.addSink(sink);
  • 上述程序指定將 outputStream 中的String 內容寫入 OSS 路徑 oss:///,最後還需用 env.execute() 語句執行 Flink 作業,env 是已建立的 StreamExecutionEnvironment 對象
  • 最後,將 Java 作業打包為 jar 文件,並用 flink run 在集群提交即可

在程序中使用 JindoFS Flink Connector:Pyflink

與Java 示例類似,在 Pyflink 中使用 JindoFS Flink Connector 與寫入 HDFS 等其他介質方式相同,只需:

  • 將寫入路徑寫作合適的 OSS 路徑
  • 注意打開 Checkpoint 功能

例如,下列 Python 程序定義了一張位於 OSS 的表:

sink_dest = "oss://<user-bucket>/<user-defined-sink-dir>"
sink_ddl = f""" 
        CREATE TABLE mySink (
                uid INT,
                pid INT
        ) PARTITIONED BY (
                pid
        ) WITH (
                'connector' = 'filesystem',
                'fpath' = '{sink_dest}',
                'format' = 'csv',
                'sink.rolling-policy.file-size' = '2MB',
                'sink.partition-commit.policy.kind' = 'success-file'
        )
"""

然後將其添加到 StreamTableEnvironmentt_env 中即可:t_env.sql_update(sink_ddl)

在程序中使用 JindoFS Flink Connector:更多配置

用戶通過 flink run 提交 java 或 pyflink 程序時,可以額外自定義一些參數,格式:

      flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

目前支持“熵注入”及“分片上傳並行度”兩項配置

熵注入(entropyinjection):

  • 功能:將寫入路徑的一段特定字符串匹配出來,用一段隨機的字符串進行替換
  • 效果:削弱所謂 “片區” (sharding) 效應,提高寫入效率
  • 配置參數:

  ​oss.entropy.key=<user-defined-key>

  oss.entropy.length=<user-defined-length>

分片上傳並行度

  • 配置參數:oss.upload.max.concurrent.uploads
  • 默認值:當前可用的處理器數量

直接觀看第四課(7/8講)視頻回放,獲取實例講解~

https://developer.aliyun.com/live/246851



Github鏈接:

https://github.com/aliyun/alibabacloud-jindofs

不錯過每次直播信息、探討更多數據湖 JindoFS+OSS 相關技術問題,歡迎掃碼加入釘釘交流群!

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *