背景:一些客戶反饋,增量同步數據到MaxCompute按照全天的數據做增量數據同步數據量太大,且不使用按天的增量同步數據,進行在MaxCompute上進行數據處理得出增量數據對於delete的相關數據不能做到很好的支持,在次給大家一個對增量數據同步的方案使用DTS做增量同步數據到MaxCompute,數據源為ECS上自建的mysql5.7。
一、為自建MySQL創建賬號並設置
1.1登陸自建Mysql數據庫
1.2創建mysql數據庫中用於數據遷移/同步的賬號
CREATE USER 'dtsmigration'@'%' IDENTIFIED BY 'Dts123456';
說明:
- username:待創建的賬號。
- host:允許該賬號登錄的主機,如果允許該賬號從任意主機登錄數據庫,可以使用百分號(%)。
- password:賬號的密碼。
1.3對賬號進行授權操作
GRANT privileges ON databasename.tablename TO 'username'@'host' WITH GRANT OPTION;
說明:
- privileges:授予該賬號的操作權限,如SELECT、INSERT、UPDATE等,如果要授予該賬號所有權限,則使用ALL。
- databasename:數據庫名。如果要授予該賬號具備所有數據庫的操作權限,則使用星號(*)。
- tablename:表名。如果要授予該賬號具備所有表的操作權限,則使用星號(*)。
- username:待授權的賬號。
- host:允許該賬號登錄的主機,如果允許該賬號從任意主機登錄,則使用百分號(%)。
- WITH GRANT OPTION:授予該賬號使用GRANT命令的權限,該參數為可選。
如果要給賬戶賦予所有數據庫和表的權限,並容許從任意主機登陸數據庫
GRANT ALL ON *.* TO 'dtsmigration'@'%';
1.4開啟並設置自建Mysql數據庫binlog
a.使用vim命令,修改配置文件my.cnf中的如下參數
log_bin=mysql_bin
binlog_format=row
server_id=2 //設置大於1的整數
binlog_row_image=full //當自建MySQL的版本大於5.6時,則必須設置該項。
b.修改完成後,重啟Mysql進程。
service mysqld restart
二、同步過程介紹
2.1結構初始化
DTS將源庫中待同步表的結構定義信息同步至MaxCompute中,初始化時DTS會為表名增加_base後綴。例如源表為customer,那麼MaxCompute中的表即為customer_base。
2.2全量數據初始化
DTS將源庫中待同步表的存量數據,全部同步至MaxCompute中的目標表名_base表中(例如從源庫的customer表同步至MaxCompute的customer_base表),作為後續增量同步數據的基線數據。
2.3增量數據同步
DTS在MaxCompute中創建一個增量日誌表,表名為同步的目標表名_log,例如customer_log,然後將源庫產生的增量數據實時同步到該表中。
三、增量同步實踐
3.1購買DTS同步
3.2查看購買的DTS同步,點擊配置同步鏈路
3.3配置對應的數據源和相應的MaxCompute項目
3.4點擊授予權限的同步賬號操作
3.5選擇對應的增量同步數據的同步實踐,並選擇需要同步的表
3.6同步配置預檢查
3.7查詢同步的全量數據
3.8查看同步成功的增量數據分區user_log
3.9查看增量數據同步的數據
元數據的字段介紹
字段 | 說明 |
---|---|
record_id | 增量日誌的記錄id,為該日誌唯一標識。 說明 - id的值唯一且遞增。 - 如果增量日誌的操作類型為UPDATE,那麼增量更新會被拆分成兩條記錄,且record_id的值相同。 |
operation_flag | 操作類型,取值: - I:INSERT操作。 - D:DELETE操作。 - U:UPDATE操作。 |
utc_timestamp | 操作時間戳,即binlog的時間戳(UTC 時間)。 |
before_flag | 所有列的值是否為更新前的值,取值:Y或N。 |
after_flag | 所有列的值是否為更新後的值,取值:Y或N。 |
四、根據時間點位,整合該時間點位之前的全量數據
4.1建立全量數據表
CREATE TABLE IF NOT EXISTS maxcomputeone_dev.user_all(uid BIGINT,uname STRING,deptno BIGINT,gender STRING,optime DATETIME,record_id BIGINT,operation_flag STRING,utc_timestamp BIGINT,before_flag STRING,after_flag STRING);
4.2查看增量數據最後同步的點位,最後整合全量數據到user_all
合併語句
set odps.sql.allow.fullscan=true;
insert overwrite table user_all
select uid,
uname,
deptno,
gender,
optime
from(
select row_number() over(partition by t.uid
order by record_id desc, after_flag desc) as record_num, record_id, operation_flag, after_flag, uid, uname, deptno,gender,optime
from(
select incr.record_id, incr.operation_flag, incr.after_flag, incr.uid, incr.uname,incr.deptno,incr.gender,incr.optime
from user_log incr
where utc_timestamp <= 1585107804
union all
select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.uid, base.uname,base.deptno,base.gender,base.optime
from user_base base) t) gt
where record_num=1
and after_flag='Y';
歡迎加入“MaxCompute開發者社區2群”,點擊鏈接申請加入或掃描二維碼
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745