開發與維運

DataWorks OpenAPI 實戰-數據開發全流程介紹

DataWorks作為飛天大數據平臺操作系統,歷經11年發展,形成了涵蓋數據集成、數據開發、數據治理、數據服務的一站式大數據開發治理平臺。很多企業用戶在使用產品的過程中希望他們的本地服務能夠和阿里雲上的DataWorks服務進行交互,從而提升企業大數據處理的效率,減少人工操作和運維工作,降低數據風險和企業成本,現在DataWorks開放OpenAPI能力滿足企業的定製化需求。
DataWorks OpenAPI涵蓋租戶、元數據、數據開發、運維中心、數據質量、數據服務等DataWorks核心能力,企業版和旗艦版分別贈送100萬次/月、1000萬次/月的免費調用額度。

關於Dataworks OpenAPI開通要求和開放地域可查閱DataWorks OpenAPI概述
限DataWorks企業版及以上使用立即開通
開通7天試用請使用釘釘掃碼聯繫

釘釘QRcode

實戰簡介

我們假設這樣一個簡單的場景,開發人員想把RDS庫裡面的數據同步到一張MaxCompute分區表中,然後在自建系統的頁面上展示經過數據分析後的報表數據,那麼如何通過DataWorks OpenAPI去完成整個鏈路的實現呢?

實戰準備

一、引入DataWorks OpenAPI SDK

這一部分可參考 安裝 DataWorks OpenAPI Java SDK,除了java語言,我們還支持Python,PHP,C#,Go 等語言支持。默認情況下我們不需要顯式去指定DataWorks OpenAPI的EndPoint,但是如果aliyun-java-sdk-core版本偏低的情況下可能會找不到DataWorks OpenAPI的Endpoint,這時候可在不升級版本的情況下通過使用如下代碼進行請求。

    IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
    DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "dataworks.cn-shanghai.aliyuncs.com");
    IAcsClient client = new DefaultAcsClient(profile);

如上代碼是顯式地指定了DataWorks OpenAPI的EndPoint,dataworks.${regionId}.aliyuncs.com這樣的域名格式在公網環境下可訪問,但是有些用戶需要在VPC環境下調用OpenAPI,那麼則需要把域名dataworks.${regionId}.aliyuncs.com 變更成 dataworks-vpc.${regionId}.aliyuncs.com,這樣在VPC網絡環境下即使不能訪問公網也能請求到DataWorks OpenAPI。
如果您不清楚regionId(地域ID)的概念,可參考地域和可用區

二、瞭解DataWorks OpenAPI文檔

詳細閱讀DataWorks OpenAPI文檔對開發非常有幫助,做API開發時如果對參數的約束不太理解時可參考DataWorks OpenAPI文檔,裡面對每個出入參、參數示例、錯誤碼描述都有詳細的解釋。點擊查看API參考>>

image.png

實戰步驟

步驟一:創建RDS數據源

集成租戶API可創建引擎、創建數據源、查看項目空間等信息。在我們這個業務場景中,MaxCompute分區表存在於MaxCompute引擎中,我們在DataWorks管控臺創建完MaxCompute工作空間後會自動創建好MaxCompute引擎的數據源,所以我們只需要使用【CreateConnection】創建好RDS數據源即可:

        CreateConnectionRequest createRequest = new CreateConnectionRequest();

        createRequest.setProjectId(-1L);
        createRequest.setName("TEST_CONNECTION");
        createRequest.setConnectionType("MYSQL");
        createRequest.setEnvType(1);
        createRequest.setContent("{\"password\":\"12345\"}");
        Long connectionId;

        try {
            CreateConnectionResponse createResponse = client.getAcsResponse(createRequest);
            Assert.assertNotNull(createResponse.getData());
            connectionId = createResponse.getData();

            UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();
            updateRequest.setConnectionId(connectionId);
            updateRequest.setDescription("1");
            UpdateConnectionResponse acsResponse = client.getAcsResponse(updateRequest);
            Assert.assertTrue(acsResponse.getData());

            DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();
            deleteRequest.setConnectionId(connectionId);
            DeleteConnectionResponse deleteResponse = client.getAcsResponse(deleteRequest);
            Assert.assertTrue(deleteResponse.getData());
        } catch (ClientException e) {
            e.printStackTrace();
            Assert.fail();
        }

UpdateConnection和DeleteConnection可分別修改和刪除數據源信息。另外對項目空間的成員進行管理的API集是CreateProjectMember、DeleteProjectMember、RemoveProjectMemberFromRole、ListProjectMembers。

步驟二:表的創建

集成DataWorks元數據OpenAPI我們能管理引擎側的表信息,通過DataWorks管控臺和租戶API我們完成了MaxCompute引擎和RDS數據源的創建工作,下一步需要完成表的創建,可通過元數據的【CreateTable】完成:

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        CreateTableRequest request = new CreateTableRequest();
        request.setTableName("table_test");
        request.setColumnss(new ArrayList<>());
        request.setEndpoint("endpoint");
        CreateTableResponse response = client.getAcsResponse(request);
        String nextTaskId = response.getTaskInfo().getNextTaskId();
        System.out.println(nextTaskId);

關於表管理的API集是CreateTable、UpdateTable、DeleteTable、GetMetaDBTableList、CheckMetaTable等,除了可對錶進行管理,元數據API還能對錶元數據、表主題進行管理,更多詳情可參考DataWorks OpenAPI文檔。

步驟三:任務開發和發佈調度

集成數據開發API可管理文件,並對文件進行提交和發佈後生成周期任務,週期任務會定時調度運行,創建不同類型的文件是根據FileType這個字段決定的,目前我們已支持非常多的FileType,通過運維中心的API【ListProgramTypeCount】可獲取所有已支持的系統節點以及自定義節點。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        CreateFileRequest createFileRequest = new CreateFileRequest();
        createFileRequest.setFileType(DefaultNodeType.ODPS_SQL.getCode());
        createFileRequest.setInputList(projectIdentifier+"_root");
        createFileRequest.setContent(content);
        createFileRequest.setFileName("create_file_" + caseId);
        createFileRequest.setFileFolderPath("業務流程/POP接口測試/MaxCompute/test_folder_3");
        createFileRequest.setFileDescription("create file " + caseId);
        createFileRequest.setRerunMode("ALL_ALLOWED");
        CreateFileResponse createFileResponse = getAcsResponse(createFileRequest);

content字段存儲SQL腳本、Shell腳本、數據集成的腳本代碼,數據集成的腳本格式可參考通過腳本模式配置任務
使用【CreateFile】創建完腳本後,如需修改可使用UpdateFile、DeleteFile進行管理。和頁面上的操作流程一致的是完成文件開發後得提交和發佈文件才會生成周期實例,這裡要注意的是需要輪詢SubmitFile返回的 DeploymentId,只有當GetDeployment返回的狀態是完成時(status.finished())才表示部署成功。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        SubmitFileRequest request = new SubmitFileRequest();
        request.setFileId(fileId);
        request.setComment("submit file");
        SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);

        //檢查提交結果
        DeploymentStatus status = null;
        GetDeploymentResponse.Data.Deployment deployment = null;
        int retryTimes = 0;
        while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
            Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
            Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
        }

如果是在標準模式的項目下開發,提交完成後,還需要發佈文件才能最終提交到調度成為週期任務。發佈文件使用DeployFile,和提交文件一樣,也需要使用GetDeployment輪詢部署狀態。

    DeployFileRequest request = new DeployFileRequest();
    request.setFileId(fileId);
    request.setComment("deploy file");
    DeployFileResponse deployFileResponse = getAcsResponse(deployFileRequest);
    //檢查發佈部署結果
    DeploymentStatus status = null;
    GetDeploymentResponse.Data.Deployment deployment = null;
    int retryTimes = 0;
    while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
                    deploymentId, new Gson().toJson(deployment));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
    }

數據開發API除了可對文件管理外,還能管理文件夾、資源、函數,更多詳情可參考DataWorks OpenAPI文檔。

步驟四:配置運維監控

通過API完成周期任務的生產之後,會在DataWorks平臺每天生成調度實例被定時調度運行,使用運維中心API可對週期任務和週期實例進行運維操作,可通過GetNode、GetInstance、ListInstances等API查看週期任務和週期實例,監控實例運行情況。

        GetInstanceRequest request = new GetInstanceRequest();
        request.setInstanceId(INSTANCE_ID);
        request.setProjectEnv(PROJECT_ENV);
        try {
            GetInstanceResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));
            BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);
            Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());
            Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());
            Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());
            Assert.assertEquals("kzh", bizInstanceDto.getNodeName());
            Assert.assertEquals("", bizInstanceDto.getParamValues());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());
            Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());
            Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());
            Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());
            Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }

如果實例運行異常可通過RestartInstance、SetSuccessInstance、SuspendInstance、ResumeInstance處理。
使用CreateRemind、UpdateRemind等API可創建自定義報警規則,確保每天基線順利產出,一旦異常可告警通知到人工,然後介入。

        CreateRemindRequest createRemindRequest = new CreateRemindRequest();
        createRemindRequest.setRemindName("REMIND_CREATE_TEST");
        createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());
        createRemindRequest.setRemindType(RemindType.ERROR.name());
        createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());
        createRemindRequest.setDndEnd("08:00");
        createRemindRequest.setNodeIds("-1");
        createRemindRequest.setMaxAlertTimes(1);
        createRemindRequest.setAlertInterval(1800);
        createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());
        createRemindRequest.setAlertTargets(MosadConstants.POP_UID);
        try { 
            CreateRemindResponse createResponse = client.getAcsResponse(createRemindRequest);
            MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));
            Assert.assertTrue(createResponse.getData() > 0);
        } catch (Exception ex) {
            ex.printStackTrace();
            return;
        }

運維中心主要提供週期任務、手動業務流程、基線查詢、告警配置和查詢等相關API,可參考DataWorks OpenAPI文檔。

步驟五:配置數據質量監控

在這個業務場景中,我們通過前面介紹的API已經可以每天定時把數據從RDS同步到MaxCompute的表中了。如果我們擔心產生髒數據或者數據缺失影響到線上業務,那麼可通過數據質量API來集成DataWorks數據質量監控能力,當表數據產出異常時,可以立刻觸發給規則訂閱人。

        CreateQualityRuleRequest request = new CreateQualityRuleRequest();
        request.setBlockType(0);
        request.setComment("test-createTemplateRuleSuccess");
        request.setCriticalThreshold("50");
        request.setEntityId(entityId);
        request.setOperator("abs");
        request.setPredictType(0);
        request.setProjectName(PROJECT_NAME);
        request.setProperty("table_count");
        request.setPropertyType("table");
        request.setRuleName("createTemplateRuleSuccess");
        request.setRuleType(0);
        request.setTemplateId(7);
        request.setWarningThreshold("10");
        try {
            CreateQualityRuleResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));
            Long templateRuleId = Long.parseLong(data.toString());
            Assert.assertTrue(templateRuleId > 0);
            return templateRuleId;
        } catch (Exception e) {
            e.printStackTrace();
            Assert.assertFalse(true);
            return null;
        }

CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode等數據質量API集可管理數據質量規則,更多數據質量API可參考DataWorks OpenAPI文檔。

步驟六:生成數據服務API

我們通過元數據API完成了表創建,通過數據開發API完成文件和週期任務創建,通過數據質量和運維中心API配置好了監控規則,MaxCompute分區表數據亦可順利產生,這時候我們還需要最後一個步驟把MaxCompute分區表的數據通過數據服務OpenAPI生成一個數據服務API向系統提供數據服務。

        CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();
        createRequest.setTenantId(tenantId);
        createRequest.setProjectId(projectId);
        createRequest.setApiMode(apiMode);
        createRequest.setApiName(apiName);
        createRequest.setApiPath(apiPath);
        createRequest.setApiDescription("test");
        createRequest.setGroupId(groupId);
        createRequest.setVisibleRange(visibleRange);
        createRequest.setTimeout(10000);
        createRequest.setProtocols(protocols);
        createRequest.setRequestMethod(requestMethod);
        createRequest.setResponseContentType(responseType);

        CreateDataServiceApiResponse createResponse = client.getAcsResponse(createRequest);
        Long apiId = createResponse.getData();
        Assert.assertNotNull(apiId);

        GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();
        getRequest.setTenantId(tenantId);
        getRequest.setProjectId(projectId);
        getRequest.setApiId(apiId);
        GetDataServiceApiResponse getResponse = client.getAcsResponse(getRequest);
        GetDataServiceApiResponse.Data data = getResponse.getData();
        Assert.assertEquals(apiId, data.getApiId());
        Assert.assertEquals(0L, data.getFolderId().longValue());

使用CreateDataServiceApi、PublishDataServiceApi可把表數據轉換成數據服務API,那麼整個數據生產鏈路就完成了,集成以上的DataWorks OpenAPI即完成了本地系統和雲上系統的無縫對接。

API調試小工具

DataWorks發佈的所有API全部可在線調試,並以可見即所得的方式產生源碼,這樣可大大提高OpenAPI的開發效率,強烈推薦使用。DataWorks OpenAPI調試入口>>

總結

工欲善其數,必先利其器!DataWorks OpenAPI是2020年正式發佈的企業數據開發提效神器。通過OpenAPI的方式,能夠極大地提高企業使用DataWorks產品能力的靈活性。目前已發佈150+個OpenAPI,並且還在持續增加中。本期實戰旨在幫助企業用戶瞭解如何快速上手DataWorks OpenAPI的實踐應用,通過場景化的實戰演練體驗DataWorks OpenAPI的強大能力。實戰系列內容持續更新中,感謝大家的關注!

關於Dataworks OpenAPI開通要求和開放地域可查閱DataWorks OpenAPI概述
限DataWorks企業版及以上使用立即開通

開通7天試用與折扣請使用釘釘掃碼聯繫

test

Leave a Reply

Your email address will not be published. Required fields are marked *