雲計算

ZooKeeper入門,看這篇就夠了

思維導圖

image.png

文章已收錄到Github精選,歡迎Starhttps://github.com/yehongzhi/learningSummary

前言

在很多時候,我們都可以在各種框架應用中看到ZooKeeper的身影,比如Kafka中間件,Dubbo框架,Hadoop等等。為什麼到處都看到ZooKeeper?

一、什麼是ZooKeeper

ZooKeeper是一個分佈式服務協調框架,提供了分佈式數據一致性的解決方案,基於ZooKeeper的數據結構,Watcher,選舉機制等特點,可以實現數據的發佈/訂閱,軟負載均衡,命名服務,統一配置管理,分佈式鎖,集群管理等等。

二、為什麼使用ZooKeeper

ZooKeeper能保證:

  • 更新請求順序進行。來自同一個client的更新請求按其發送順序依次執行
  • 數據更新原子性。一次數據更新要麼成功,要麼失敗
  • 全局唯一數據視圖。client無論連接到哪個server,數據視圖都是一致的
  • 實時性。在一定時間範圍內,client讀到的數據是最新的

三、數據結構

ZooKeeper的數據結構和Unix文件系統很類似,總體上可以看做是一棵樹,每一個節點稱之為一個ZNode,每一個ZNode默認能存儲1M的數據。每一個ZNode可通過唯一的路徑標識。如下圖所示:

創建ZNode時,可以指定以下四種類型,包括:

  • PERSISTENT,持久性ZNode。創建後,即使客戶端與服務端斷開連接也不會刪除,只有客戶端主動刪除才會消失。
  • PERSISTENT_SEQUENTIAL,持久性順序編號ZNode。和持久性節點一樣不會因為斷開連接後而刪除,並且ZNode的編號會自動增加。
  • EPHEMERAL,臨時性ZNode。客戶端與服務端斷開連接,該ZNode會被刪除。
  • EPEMERAL_SEQUENTIAL,臨時性順序編號ZNode。和臨時性節點一樣,斷開連接會被刪除,並且ZNode的編號會自動增加。

四、監聽通知機制

Watcher是基於觀察者模式實現的一種機制。如果我們需要實現當某個ZNode節點發生變化時收到通知,就可以使用Watcher監聽器。

客戶端通過設置監視點(watcher)向 ZooKeeper 註冊需要接收通知的 znode,在 znode 發生變化時 ZooKeeper 就會向客戶端發送消息

這種通知機制是一次性的。一旦watcher被觸發,ZooKeeper就會從相應的存儲中刪除。如果需要不斷監聽ZNode的變化,可以在收到通知後再設置新的watcher註冊到ZooKeeper。

監視點的類型有很多,如監控ZNode數據變化、監控ZNode子節點變化、監控ZNode 創建或刪除

五、選舉機制

ZooKeeper是一個高可用的應用框架,因為ZooKeeper是支持集群的。ZooKeeper在集群狀態下,配置文件是不會指定Master和Slave,而是在ZooKeeper服務器初始化時就在內部進行選舉,產生一臺做為Leader,多臺做為Follower,並且遵守半數可用原則。

由於遵守半數可用原則,所以5臺服務器和6臺服務器,實際上最大允許宕機數量都是3臺,所以為了節約成本,集群的服務器數量一般設置為奇數

如果在運行時,如果長時間無法和Leader保持連接的話,則會再次進行選舉,產生新的Leader,以保證服務的可用

六、初の體驗

首先在官網下載ZooKeeper,我這裡用的是3.3.6版本。

然後解壓,複製一下/conf目錄下的zoo_sample.cfg文件,重命名為zoo.cfg。

修改zoo.cfg中dataDir的值,並創建對應的目錄:

最後到/bin目錄下啟動,我用的是window系統,所以啟動zkServer.cmd,雙擊即可:

啟動成功的話就可以看到這個對話框:

可視化界面的話,我推薦使用ZooInspector,操作比較簡便:

6.1 使用java連接ZooKeeper

首先引入Maven依賴:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

接著我們寫一個Main方法,進行操作:

    //連接地址及端口號
    private static final String SERVER_HOST = "127.0.0.1:2181";

    //會話超時時間
    private static final int SESSION_TIME_OUT = 2000;

    public static void main(String[] args) throws Exception {
        //參數一:服務端地址及端口號
        //參數二:超時時間
        //參數三:監聽器
        ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //獲取事件的狀態
                Event.KeeperState state = watchedEvent.getState();
                //判斷是否是連接事件
                if (Event.KeeperState.SyncConnected == state) {
                    Event.EventType type = watchedEvent.getType();
                    if (Event.EventType.None == type) {
                        System.out.println("zk客戶端已連接...");
                    }
                }
            }
        });
        zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("新增ZNode成功");
        zooKeeper.close();
    }

創建一個持久性ZNode,路徑是/java,值為"Hello World":

七、API概述

7.1 創建

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)

參數解釋:

  • path ZNode路徑
  • data ZNode存儲的數據
  • acl ACL權限控制
  • createMode ZNode類型

ACL權限控制,有三個是ZooKeeper定義的常用權限,在ZooDefs.Ids類中:

/**
 * This is a completely open ACL.
 * 完全開放的ACL,任何連接的客戶端都可以操作該屬性znode
 */
public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));

/**
 * This ACL gives the creators authentication id's all permissions.
 * 只有創建者才有ACL權限
 */
public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));

/**
 * This ACL gives the world the ability to read.
 * 只能讀取ACL
 */
public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));

createMode就是前面講過的四種ZNode類型:

public enum CreateMode {
    /**
     * 持久性ZNode
     */
    PERSISTENT (0, false, false),
    /**
     * 持久性自動增加順序號ZNode
     */
    PERSISTENT_SEQUENTIAL (2, false, true),
    /**
     * 臨時性ZNode
     */
    EPHEMERAL (1, true, false),
    /**
     * 臨時性自動增加順序號ZNode
     */
    EPHEMERAL_SEQUENTIAL (3, true, true);
}

7.2 查詢

//同步獲取節點數據
public byte[] getData(String path, boolean watch, Stat stat){
    ...
}

//異步獲取節點數據
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx){
    ...
}

同步getData()方法中的stat參數是用於接收返回的節點描述信息:

public byte[] getData(final String path, Watcher watcher, Stat stat){
    //省略...
    GetDataResponse response = new GetDataResponse();
    //發送請求到ZooKeeper服務器,獲取到response
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (stat != null) {
        //把response的Stat賦值到傳入的stat中
        DataTree.copyStat(response.getStat(), stat);
    }
}

使用同步getData()獲取數據:

    //數據的描述信息,包括版本號,ACL權限,子節點信息等等
    Stat stat = new Stat();
    //返回結果是byte[]數據,getData()方法底層會把描述信息複製到stat對象中
    byte[] bytes = zooKeeper.getData("/java", false, stat);
    //打印結果
    System.out.println("ZNode的數據data:" + new String(bytes));//Hello World
    System.out.println("獲取到dataVersion版本號:" + stat.getVersion());//默認數據版本號是0

7.3 更新

public Stat setData(final String path, byte data[], int version){
    ...
}

值得注意的是第三個參數version,使用CAS機制,這是為了防止多個客戶端同時更新節點數據,所以需要在更新時傳入版本號,每次更新都會使版本號+1,如果服務端接收到版本號,對比發現不一致的話,則會拋出異常。

所以,在更新前需要先查詢獲取到版本號,否則你不知道當前版本號是多少,就沒法更新:

    //獲取節點描述信息
    Stat stat = new Stat();
    zooKeeper.getData("/java", false, stat);
    System.out.println("更新ZNode數據...");
    //更新操作,傳入路徑,更新值,版本號三個參數,返回結果是新的描述信息
    Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());
    System.out.println("更新後的版本號為:" + setData.getVersion());//更新後的版本號為:1

更新後,版本號增加了:

如果傳入的版本參數是"-1",就是告訴zookeeper服務器,客戶端需要基於數據的最新版本進行更新操作。但是-1並不是一個合法的版本號,而是一個標識符。

7.4 刪除

public void delete(final String path, int version){
    ...
}
  • path 刪除節點的路徑
  • version 版本號

這裡也需要傳入版本號,調用getData()方法即可獲取到版本號,很簡單:

Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
//刪除ZNode
zooKeeper.delete("/java", stat.getVersion());

7.5 watcher機制

在上面第三點提到,ZooKeeper是可以使用通知監聽機制,當ZNode發生變化會收到通知消息,進行處理。基於watcher機制,ZooKeeper能玩出很多花樣。怎麼使用?

ZooKeeper的通知監聽機制,總的來說可以分為三個過程:

①客戶端註冊 Watcher
②服務器處理 Watcher
③客戶端回調 Watcher客戶端。

註冊 watcher 有 4 種方法,new ZooKeeper()、getData()、exists()、getChildren()。下面演示一下使用exists()方法註冊watcher:

首先需要實現Watcher接口,新建一個監聽器:

public class MyWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        //獲取事件類型
        Event.EventType eventType = event.getType();
        //通知狀態
        Event.KeeperState eventState = event.getState();
        //節點路徑
        String eventPath = event.getPath();
        System.out.println("監聽到的事件類型:" + eventType.name());
        System.out.println("監聽到的通知狀態:" + eventState.name());
        System.out.println("監聽到的ZNode路徑:" + eventPath);
    }
}

然後調用exists()方法,註冊監聽器:

zooKeeper.exists("/java", new MyWatcher());
//對ZNode進行更新數據的操作,觸發監聽器
zooKeeper.setData("/java", "fly".getBytes(), -1);

然後在控制檯就可以看到打印的信息:

這裡我們可以看到Event.EventType對象就是事件類型,我們可以對事件類型進行判斷,再配合Event.KeeperState通知狀態,做相關的業務處理,事件類型有哪些?

打開EventType、KeeperState的源碼查看:

//事件類型是一個枚舉
public enum EventType {
    None (-1),//無
    NodeCreated (1),//Watcher監聽的數據節點被創建
    NodeDeleted (2),//Watcher監聽的數據節點被刪除
    NodeDataChanged (3),//Watcher監聽的數據節點內容發生變更
    NodeChildrenChanged (4);//Watcher監聽的數據節點的子節點列表發生變更
}

//通知狀態也是一個枚舉
public enum KeeperState {
    Unknown (-1),//屬性過期
    Disconnected (0),//客戶端與服務端斷開連接
    NoSyncConnected (1),//屬性過期
    SyncConnected (3),//客戶端與服務端正常連接
    AuthFailed (4),//身份認證失敗
    ConnectedReadOnly (5),//返回這個狀態給客戶端,客戶端只能處理讀請求
    SaslAuthenticated(6),//服務器採用SASL做校驗時
    Expired (-112);//會話session失效
}

7.5.1 watcher的特性

  • 一次性。一旦watcher被觸發,ZK都會從相應的存儲中移除。
    zooKeeper.exists("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("我是exists()方法的監聽器");
        }
    });
    //對ZNode進行更新數據的操作,觸發監聽器
    zooKeeper.setData("/java", "fly".getBytes(), -1);
    //企圖第二次觸發監聽器
    zooKeeper.setData("/java", "spring".getBytes(), -1);

  • 串行執行。客戶端Watcher回調的過程是一個串行同步的過程,這是為了保證順序。
    zooKeeper.exists("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("我是exists()方法的監聽器");
        }
    });
    Stat stat = new Stat();
    zooKeeper.getData("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("我是getData()方法的監聽器");
        }
    }, stat);
    //對ZNode進行更新數據的操作,觸發監聽器
    zooKeeper.setData("/java", "fly".getBytes(), stat.getVersion());

打印結果,說明先調用exists()方法的監聽器,再調用getData()方法的監聽器。因為exists()方法先註冊了。

  • 輕量級。WatchedEvent是ZK整個Watcher通知機制的最小通知單元。WatchedEvent包含三部分:通知狀態,事件類型,節點路徑。Watcher通知僅僅告訴客戶端發生了什麼事情,而不會說明事件的具體內容。

在這裡插入圖片描述

Leave a Reply

Your email address will not be published. Required fields are marked *