大數據

Zeppelin SDK :Flink 平臺建設的基石

作者:章劍鋒(簡鋒),阿里巴巴高級技術專家

用過 Zeppelin 的人應該比較熟悉 Zeppelin 的 UI,因為 Zeppelin 的主要使用場景都是交互式,用戶需要手動來操作。那除了這種手動的方式,還有其他的方式嗎?如果你不想用 Zeppelin UI,但又想用 Zeppelin 提交和管理大數據作業 (比如 Flink Job)的能力該怎麼辦?或者是你在 Zeppelin 裡寫好了代碼,想定時調度起來,或者集成到其他系統裡,該怎麼辦?

如果你有這樣的訴求,那麼 Zeppelin Client API (SDK)就是你所需要的東西。

Zeppelin 簡介

對於不熟悉 Zeppelin 的人,可以用一句話來解釋 Zeppelin:大數據引擎的入口,交互式大數據分析平臺底座。Zeppelin 最大的特點是連接多種引擎,具有可插拔式,下面這張圖例舉了一些常用的引擎,當然 Zeppelin 還支持其他很多引擎,這裡就不一一例舉。

image3.png

雖然 Zeppelin 有 Rest API,但是 Zeppelin 的 Rest API 太多,對於很多不熟悉 Zeppelin 的人來說使用 Rest API 門檻太高,所以 Zeppelin 專門開發了一個 Client API (SDK),方便大家做集成。Zeppelin Client API (SDK)分為 2 個層面的的東西(接下來會逐個詳細介紹):

  • Zeppelin Client API (Low Level API)
  • Session API (High Level API)

Zeppelin Client API (Low Level API)

Zeppelin Client API 可以在 Note 和 Paragraph 的粒度進行操作。你可以先在 notebook 裡寫好代碼 (比如開發階段在 notebook 裡寫代碼,做測試),然後用 Low Level API 用編程的方式把 Job 跑起來(比如生產階段把作業定時調度起來)。Zeppelin Client API 最重要的 class 是 ZeppelinClient,也是 Zeppelin Client API 的入口。下面例舉幾個重要的接口(這些 API 都比較直觀,我就不多做解釋了)。

public String createNote(String notePath) throws Exception 

public void deleteNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId, 
                              Map<String, String> parameters) throws Exception
                              
public NoteResult queryNoteResult(String noteId) throws Exception 

public NoteResult submitNote(String noteId) throws Exception

public NoteResult submitNote(String noteId, 
                             Map<String, String> parameters) throws Exception 
                             
public NoteResult waitUntilNoteFinished(String noteId) throws Exception

public String addParagraph(String noteId, 
                           String title, 
                           String text) throws Exception
                           
public void updateParagraph(String noteId, 
                            String paragraphId, 
                            String title, 
                            String text) throws Exception
                            
public ParagraphResult executeParagraph(String noteId,
                                        String paragraphId,
                                        String sessionId,
                                        Map<String, String> parameters) throws Exception
                                        
public ParagraphResult submitParagraph(String noteId,
                                       String paragraphId,
                                       String sessionId,
                                       Map<String, String> parameters) throws Exception
                                       
public void cancelParagraph(String noteId, String paragraphId)
    
public ParagraphResult queryParagraphResult(String noteId, String paragraphId) 
    
public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)

那這些 API 能用來做什麼呢?

一個典型的用途是我們在 Zeppelin 裡寫好代碼,做好測試,然後在第三方系統裡集成進來。比如下面的代碼就是把 Zeppelin 自帶的 Spark Basic Features 用編程的方式跑起來,你不僅可以跑 Zeppelin Note,還可以拿到運行結果 (ParagraphResult)。怎麼處理運行結果,就留給你發揮想象的空間吧(可以在你的系統裡展示出來,或者可視化出來,或者傳給其他系統做消費等等)。

此外,對於 Dynamic forms(動態控件,比如文本框,下拉框等等),你還可以動態的提供參數,如下面例子裡的 maxAge 和 marital。

ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
ZeppelinClient zClient = new ZeppelinClient(clientConfig);

String zeppelinVersion = zClient.getVersion();
System.out.println("Zeppelin version: " + zeppelinVersion);

ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);

paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);

Map<String, String> parameters = new HashMap<>();
parameters.put("maxAge", "40");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);

parameters = new HashMap<>();
parameters.put("marital", "married");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);

這下面這張圖就是上面我們要 Zeppelin Client API 跑的 Zeppelin 自帶的 Spark Basic Features。

image2.png

Session API (High Level API)

Session API 是 Zeppelin 的high level api,Session API 裡沒有 Note,Paragraph 的概念,粒度是你提交的代碼。Session API裡最重要的class就是 ZSession,這也是Session API的入口,一個 ZSession 代表一個獨立的Zeppelin Interpreter 進程,對於 Flink 來說就是一個獨立的 Flink Session Cluster。下面例舉一些典型的接口(這些 API 都比較直觀,我就不多做解釋了)。

public void start() throws Exception

public void start(MessageHandler messageHandler) throws Exception

public void stop() throws Exception

public ExecuteResult execute(String code) throws Exception

public ExecuteResult execute(String subInterpreter,
                             Map<< span="">String, String> localProperties,
                             String code,
                             StatementMessageHandler messageHandler) throws Exception

public ExecuteResult submit(String code) throws Exception

public ExecuteResult submit(String subInterpreter,
                            Map<< span="">String, String> localProperties,
                            String code,
                            StatementMessageHandler messageHandler) throws Exception
                           
public void cancel(String statementId) throws Exception
 
public ExecuteResult queryStatement(String statementId) throws Exception

public ExecuteResult waitUntilFinished(String statementId) throws Exception

那這個 API 能用來做什麼呢? 一個典型的用途是就是我們動態創建 Session (Zeppelin Interpreter 進程),動態的提交運行代碼,並拿到運行結果。比如你不想用 Zeppelin 的 UI,要自己做一個 Flink 的開發管理平臺,那麼你就可以自己做 UI,讓用戶在 UI 上配置 Flink Job,輸入 SQL,然後把所有的這些信息發送到後端,後端調用 ZSession 來運行 Flink Job。

下面的 Java 代碼就是用編程的方式調用了 2 條 Flink SQL 語句,並且在 MyStatementMessageHandler1 和 MyStatementMessageHandler2 中讀取源源不斷髮送過來更新的 SQL 運行結果 (怎麼來使用這個結果就靠你的想象力了)。

需要說明的是像 Flink Interpreter 這種流式結果數據更新是通過 WebSocket 實現的,所以下面的代碼裡有會有 CompositeMessageHandler,MyStatementMessageHandler1 以及 MyStatementMessageHandler2,這些 MessageHandler 就是用來處理通過 WebSocket 發送過來的流式數據結果。下面是 2 條我們在 Zeppelin 裡運行的 Flink SQL。

image5.png
image8.png

接下來我們會用 Zeppelin Session API 來跑著這 2 條 Flink SQL,然後我們會在MyStatementMessageHandler1,MyStatementMessageHandler2 裡拿到結果展示出來。

ZSession session = null;
try {
    ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
    Map<< span="">String, String> intpProperties = new HashMap<>();

    session = ZSession.builder()
        .setClientConfig(clientConfig)
        .setInterpreter("flink")
        .setIntpProperties(intpProperties)
        .build();

    // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
    // otherwise you have to use a global MessageHandler.
    session.start(new CompositeMessageHandler());
    System.out.println("Flink Web UI: " + session.getWeburl());

    System.out.println("-----------------------------------------------------------------------------");
    String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
    ExecuteResult result = session.execute(initCode);
    System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

    // run flink ssql
    Map<< span="">String, String> localProperties = new HashMap<>();
    localProperties.put("type", "update");
    result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
                            new MyStatementMessageHandler1());
    session.waitUntilFinished(result.getStatementId());

    result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
                            new MyStatementMessageHandler2());
    session.waitUntilFinished(result.getStatementId());

} catch (Exception e) {
    e.printStackTrace();
} finally {
    if (session != null) {
        try {
            session.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public static class MyStatementMessageHandler1 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler1, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler1, update output: " + output);
    }
}

public static class MyStatementMessageHandler2 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler2, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler2, update output: " + output);
    }
}

除了編程方式跑 Flink Job,這個 Session API 還能給我們帶來什麼呢?

在 Zeppelin 裡如果你可以通過 %flink.conf 來對你的 Flink Cluster 進行非常豐富的配置,但是 %flink.conf 是純文本的配置,不熟悉 Flink 的人很容易配錯(如下圖)。如果你是自己做 Flink 開發平臺的話就可以做一個更完整的 UI,用一些下拉框等等把一些配置選項固定下來,用戶只要選擇就行了,不需要自己輸入文本來配置。

image7.png

還有下面這類 paragraph 的 local properties 配置,比如 type,template, resumeFromLatestCheckpoint 也是比較容易寫錯的,同理你可以在自己 UI 裡用一些控件把這些選項提前固定下來,而不是讓用戶輸入文本的方式。

image6.png

我相信 Zeppelin Client API 還有很多可以發揮和想象的空間,大家腦洞起來吧。

▼ 視頻演示 ▼

視頻演示鏈接
https://v.qq.com/x/page/m3146grr5e1.html

更多 Flink 技術乾貨及使用交流可加入 Flink 社區釘釘大群。

最新釘群二維碼.jpeg

Leave a Reply

Your email address will not be published. Required fields are marked *