大數據

DataWorks百問百答28:如何結合賦值節點通過MongoDB時間戳類型字段實現增量同步?

背景:數據集成無法同步MongoDB時間戳字段類型實現增量同步。
場景:定時獲取10分鐘的增量數據,MongoDB增量字段為時間戳格式數據。

設置任務依賴實現參數傳遞:

設置節點依賴關係,調度配置都設置10分鐘調度
data4.png

1、使用兩個賦值節點定義時間戳格式的時間

開始時間:
參數:day=$[yyyy-mm-dd] start_time=$[hh24:mi:ss- 1/24/60*10]
賦值語言選ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
結束時間:
參數:day=$[yyyy-mm-dd] end_time=$[hh24:mi:ss]
賦值語言選ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
data3.png

2、配置MongoDB同步節點

添加本節點輸入參數 start_time和end_time,取值自上游的兩個賦值節點
data2.png

MongoDB原始數據:

腳本模式配置示例代碼,源端create_time是double類型,存的時間戳。
data1.png

"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",

腳本配置示例

{
    "type": "job",
    "steps": [
        {
            "stepType": "mongodb",
            "parameter": {
                "datasource": "ds1",
                                "query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
                "column": [
                    {
                        "name": "doc_id",
                        "type": "STRING"
                    },
                    {
                        "name": "create_time",
                        "type": "DOUBLE"
                    },
                    {
                        "name": "date_time",
                        "type": "DATE"
                    }
                ],
                "collectionName": "test1"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "",
                "truncate": false,
                "compress": false,
                "datasource": "odps_first",
                "column": [
                    "doc_id",
                    "create_time",
                    "date_time"
                ],
                "emptyAsNull": false,
                "table": "tablename"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}

DataWorks百問百答歷史記錄 請點擊這裡查看>>

更多DataWorks技術和產品信息,歡迎加入【DataWorks釘釘交流群】

Leave a Reply

Your email address will not be published. Required fields are marked *