大數據

阿里雲Dataworks離線數據同步寫入Kafka

Step By Step

1、kafka實例的創建&獨享數據集成資源組的創建參考博客(資源創建部分):
Dataworks實時數據同步(Kafka -> maxcompute)

2、數據集成配置Kafka數據源&測試連通性
image.png

3、maxcompute創建測試數據表

CREATE TABLE IF NOT EXISTS odps_to_kafka1(key1 STRING,value1 STRING);

INSERT INTO odps_to_kafka1 VALUES ("key_key1","value_value1");
INSERT INTO odps_to_kafka1 VALUES ("key_key2","value_value2");
INSERT INTO odps_to_kafka1 VALUES ("key_key3","value_value3");
INSERT INTO odps_to_kafka1 VALUES ("key_key4","value_value4");
INSERT INTO odps_to_kafka1 VALUES ("key_key5","value_value5");

SELECT * FROM odps_to_kafka1;

image.png

4、配置離線同步腳本(注意目前Kafka僅支持腳本模式,不支持想到模式)

{
    "type": "job",
    "steps": [
        {
            "stepType": "odps",
            "parameter": {
                "partition": [],
                "datasource": "odps_first",
                "envType": 1,
                "column": [
                    "key1",
                    "value1"
                ],
                "table": "odps_to_kafka1"  // maxcompute中表的名稱
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "Kafka",
            "parameter": {
                "server": "192.168.0.67:9092,192.168.0.66:9092,192.168.0.65:9092", // 注意配置kafka內網地址
                "keyIndex": 0,   // key值對應maxcompute讀取column的第一列
                "valueIndex": 1,  // value值對應maxcompute讀取column的第二列
                "valueType": "BYTEARRAY",
                "topic": "from_odps1",  // kafka 中表的名稱
                "batchSize": 1024,
                "keyType": "BYTEARRAY"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "2"
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}

注意: 保存腳本的時候如果提示不滿足json格式規範,將註釋部分刪除即可。

5、執行同步任務
image.png

6、Kafka控制檯查看數據同步情況
image.png

更多參考

Kafka Writer
Dataworks實時數據同步(Kafka -> maxcompute)
新增和使用獨享數據集成資源組

Leave a Reply

Your email address will not be published. Required fields are marked *