開發與維運

FlinkX 如何讀取和寫入 Clickhouse?

本文將主要介紹 FlinkX 讀取和寫入 Clickhouse 的過程及相關參數,核心內容將圍繞以下3個問題,FlinkX 插件下載:

https://github.com/DTStack/flinkx

  1. FlinkX讀寫Clickhouse支持哪個版本?
  2. ClickHouse讀寫Clickhouse有哪些參數?
  3. ClickHouse讀寫Clickhouse參數都有哪些說明?

ClickHouse 讀取

一、插件名稱

名稱:clickhousereader

二、支持的數據源版本

ClickHouse 19.x及以上

三、參數說明

「jdbcUrl」

  • 描述:針對關係型數據庫的jdbc連接字符串
  • jdbcUrl參考文檔:clickhouse-jdbc官方文檔
  • 必選:是
  • 默認值:無

「username」

  • 描述:數據源的用戶名
  • 必選:是
  • 默認值:無

「password」

  • 描述:數據源指定用戶名的密碼
  • 必選:是
  • 默認值:無

「where」

  • 描述:篩選條件,reader插件根據指定的column、table、where條件拼接SQL,並根據這個SQL進行數據抽取。在實際業務場景中,往往會選擇當天的數據進行同步,可以將where條件指定為gmt_create > time。
  • 注意:不可以將where條件指定為limit 10,limit不是SQL的合法where子句。
  • 必選:否
  • 默認值:無

「splitPk」

  • 描述:當speed配置中的channel大於1時指定此參數,Reader插件根據併發數和此參數指定的字段拼接sql,使每個併發讀取不同的數據,提升讀取速率。注意:推薦splitPk使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分片也不容易出現數據熱點。目前splitPk僅支持整形數據切分,不支持浮點、字符串、日期等其他類型。如果用戶指定其他非支持類型,FlinkX將報錯!如果channel大於1但是沒有配置此參數,任務將置為失敗。
  • 必選:否
  • 默認值:無

「fetchSize」

  • 描述:讀取時每批次讀取的數據條數。
  • 注意:此參數的值不可設置過大,否則會讀取超時,導致任務失敗。
  • 必選:否
  • 默認值:1000

「queryTimeOut」

  • 描述:查詢超時時間,單位秒。
  • 注意:當數據量很大,或者從視圖查詢,或者自定義sql查詢時,可通過此參數指定超時時間。
  • 必選:否
  • 默認值:1000

「customSql」

  • 描述:自定義的查詢語句,如果只指定字段不能滿足需求時,可通過此參數指定查詢的sql,可以是任意複雜的查詢語句。注意:只能是查詢語句,否則會導致任務失敗;查詢語句返回的字段需要和column列表裡的字段嚴格對應;當指定了此參數時,connection裡指定的table無效;當指定此參數時,column必須指定具體字段信息,不能以*號代替;
  • 必選:否
  • 默認值:無

「column」

  • 描述:需要讀取的字段。
  • 格式:支持3種格式

1.讀取全部字段,如果字段數量很多,可以使用下面的寫法:

"column":["*"]

2.只指定字段名稱:

"column":["id","name"]

3.指定具體信息:

"column": [{
    "name": "col",
    "type": "datetime",
    "format": "yyyy-MM-dd hh:mm:ss",
    "value": "value"
}]

屬性說明:

  1. name:字段名稱
  2. type:字段類型,可以和數據庫裡的字段類型不一樣,程序會做一次類型轉換
  3. format:如果字段是時間字符串,可以指定時間的格式,將字段類型轉為日期格式返回
  4. value:如果數據庫裡不存在指定的字段,則會報錯。如果指定的字段存在,當指定字段的值為null時,會以此value值作為默認值返回
  • 必選:是
  • 默認值:無

「polling」

  • 描述:是否開啟間隔輪詢,開啟後會根據pollingInterval輪詢間隔時間週期性的從數據庫拉取數據。開啟間隔輪詢還需配置參數pollingInterval,increColumn,可以選擇配置參數startLocation。若不配置參數startLocation,任務啟動時將會從數據庫中查詢增量字段最大值作為輪詢的開始位置。
  • 必選:否
  • 默認值:false

「pollingInterval」

描述:輪詢間隔時間,從數據庫中拉取數據的間隔時間,默認為5000毫秒。
必選:否
默認值:5000

「requestAccumulatorInterval」

  • 描述:發送查詢累加器請求的間隔時間。
  • 必選:否
  • 默認值:2

配置示例

1、基礎配置

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

2、多通道

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 3,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

3、指定customSql

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "select id from tableTest",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

4、增量同步指定startLocation

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "increColumn": "id",
          "startLocation": "20",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

5、間隔輪詢

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2,
          "polling": true,
          "pollingInterval": 3000
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

ClickHouse 寫入

一、插件名稱

名稱:clickhousewriter

二、支持的數據源版本

ClickHouse 19.x及以上

三、參數說明

「jdbcUrl」

  • 描述:針對關係型數據庫的jdbc連接字符串
  • 必選:是
  • 默認值:無

「username」

  • 描述:數據源的用戶名
  • 必選:是
  • 默認值:無

「password」

  • 描述:數據源指定用戶名的密碼
  • 必選:是
  • 默認值:無

「column」

  • 描述:目的表需要寫入數據的字段,字段之間用英文逗號分隔。例如: "column": ["id","name","age"]。
  • 必選:是
  • 默認值:否
  • 默認值:無

「preSql」

  • 描述:寫入數據到目的表前,會先執行這裡的一組標準語句
  • 必選:否
  • 默認值:無

「postSql」

  • 描述:寫入數據到目的表後,會執行這裡的一組標準語句
  • 必選:否
  • 默認值:無

「table」

  • 描述:目的表的表名稱。目前只支持配置單個表,後續會支持多表
  • 必選:是
  • 默認值:無

「writeMode」

  • 描述:控制寫入數據到目標表採用 insert into 語句,只支持insert操作
  • 必選:是
  • 所有選項:insert
  • 默認值:insert

「batchSize」

  • 描述:一次性批量提交的記錄數大小,該值可以極大減少FlinkX與數據庫的網絡交互次數,並提升整體吞吐量。但是該值設置過大可能會造成FlinkX運行進程OOM情況
  • 必選:否
  • 默認值:1024

文章來源如下,感興趣的同學可查看原文:
https://www.aboutyun.com/forum.php?mod=viewthread&tid=29271

更多 Flink 技術問題可在釘釘群交流

Leave a Reply

Your email address will not be published. Required fields are marked *