背景:數據集成無法同步MongoDB時間戳字段類型實現增量同步。
場景:定時獲取10分鐘的增量數據,MongoDB增量字段為時間戳格式數據。
設置任務依賴實現參數傳遞:
設置節點依賴關係,調度配置都設置10分鐘調度
1、使用兩個賦值節點定義時間戳格式的時間
開始時間:
參數:day=$[yyyy-mm-dd] start_time=$[hh24:mi:ss- 1/24/60*10]
賦值語言選ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
結束時間:
參數:day=$[yyyy-mm-dd] end_time=$[hh24:mi:ss]
賦值語言選ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
2、配置MongoDB同步節點
添加本節點輸入參數 start_time和end_time,取值自上游的兩個賦值節點
MongoDB原始數據:
腳本模式配置示例代碼,源端create_time是double類型,存的時間戳。
"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
腳本配置示例
{
"type": "job",
"steps": [
{
"stepType": "mongodb",
"parameter": {
"datasource": "ds1",
"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
"column": [
{
"name": "doc_id",
"type": "STRING"
},
{
"name": "create_time",
"type": "DOUBLE"
},
{
"name": "date_time",
"type": "DATE"
}
],
"collectionName": "test1"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "",
"truncate": false,
"compress": false,
"datasource": "odps_first",
"column": [
"doc_id",
"create_time",
"date_time"
],
"emptyAsNull": false,
"table": "tablename"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle": false,
"concurrent": 2
}
}
}
DataWorks百問百答歷史記錄 請點擊這裡查看>>
更多DataWorks技術和產品信息,歡迎加入【DataWorks釘釘交流群】