Step By Step
1、kafka實例的創建&獨享數據集成資源組的創建參考博客(資源創建部分):
Dataworks實時數據同步(Kafka -> maxcompute)
2、數據集成配置Kafka數據源&測試連通性
3、maxcompute創建測試數據表
CREATE TABLE IF NOT EXISTS odps_to_kafka1(key1 STRING,value1 STRING);
INSERT INTO odps_to_kafka1 VALUES ("key_key1","value_value1");
INSERT INTO odps_to_kafka1 VALUES ("key_key2","value_value2");
INSERT INTO odps_to_kafka1 VALUES ("key_key3","value_value3");
INSERT INTO odps_to_kafka1 VALUES ("key_key4","value_value4");
INSERT INTO odps_to_kafka1 VALUES ("key_key5","value_value5");
SELECT * FROM odps_to_kafka1;
4、配置離線同步腳本(注意目前Kafka僅支持腳本模式,不支持想到模式)
{
"type": "job",
"steps": [
{
"stepType": "odps",
"parameter": {
"partition": [],
"datasource": "odps_first",
"envType": 1,
"column": [
"key1",
"value1"
],
"table": "odps_to_kafka1" // maxcompute中表的名稱
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "Kafka",
"parameter": {
"server": "192.168.0.67:9092,192.168.0.66:9092,192.168.0.65:9092", // 注意配置kafka內網地址
"keyIndex": 0, // key值對應maxcompute讀取column的第一列
"valueIndex": 1, // value值對應maxcompute讀取column的第二列
"valueType": "BYTEARRAY",
"topic": "from_odps1", // kafka 中表的名稱
"batchSize": 1024,
"keyType": "BYTEARRAY"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "2"
},
"speed": {
"throttle": false,
"concurrent": 2
}
}
}
注意: 保存腳本的時候如果提示不滿足json格式規範,將註釋部分刪除即可。
5、執行同步任務
6、Kafka控制檯查看數據同步情況
更多參考
Kafka Writer
Dataworks實時數據同步(Kafka -> maxcompute)
新增和使用獨享數據集成資源組