開發與維運

一文帶你理解java中的同步工具類CountDownLatch

一、定義


CountDownLatch的作用很簡單,就是一個或者一組線程在開始執行操作之前,必須要等到其他線程執行完才可以。我們舉一個例子來說明,在考試的時候,老師必須要等到所有人交了試卷才可以走。此時老師就相當於等待線程,而學生就好比是執行的線程。

注意:java中還有一個同步工具類叫做CyclicBarrier,他的作用和CountDownLatch類似。同樣是等待其他線程都完成了,才可以進行下一步操作,我們再舉一個例子,在打王者的時候,在開局前所有人都必須要加載到100%才可以進入。否則所有玩家都相互等待。

我們看一下區別:CountDownLatch: 一個線程(或者多個), 等待另外N個線程完成某個事情之後才能執行。 CyclicBarrier : N個線程相互等待,任何一個線程完成之前,所有的線程都必須等待。關鍵點其實就在於那N個線程(1)CountDownLatch裡面N個線程就是學生,學生做完了試卷就可以走了,不用等待其他的學生是否完成(2)CyclicBarrier 裡面N個線程就是所有的遊戲玩家,一個遊戲玩家加載到100%還不可以,必須要等到其他的遊戲玩家都加載到100%才可以開局

現在應該理解CountDownLatch的含義了吧,下面我們使用一個代碼案例來解釋。


二、使用


我們使用學生考試的案例來進行演示:

public class CountDownLatchTest {
    static CountDownLatch countDownLatch = new CountDownLatch(2);
    public static void main(String[] args) {
        System.out.println("全班同學開始考試:一共兩個學生");
        new Thread(() -> {
            System.out.println("第一個學生交卷,countDownLatch減1");
            countDownLatch.countDown();
        }).start();
        new Thread(() -> {
            System.out.println("第二個學生交卷,countDownLatch減1");
            countDownLatch.countDown();
        }).start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("老師清點試卷,在此之前,只要一個學生沒交,"
                + "countDownLatch不為0,不能離開考場");
    }
}

在上面,我們定義了一個CountDownLatch,並設置其值為2。有兩個學生使用兩個線程來表示,然後依次執行。最後老師線程(main線程)在學生線程都執行完了才可以執行。我們來運行一邊看看結果。

v2-436a3aa1003fd57aa0ad74ea7abba4fc_1440w (1).jpg

現在我們應該能體會到其用法了吧。在上面我們的等待線程時老師(main線程)。

下面我們對這個countDownLatch分析一下。為什麼具有上面的特點。

三、原理


在上面我們看到,CountDownLatch主要使用countDown方法進行減1操作,使用await方法進行等到操作。我們進入到源碼中看看。本源碼基於jdk1.8。特在此說明。

1、countDown原理


/**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

英語不好的人看起來真的是一臉懵逼,不過信號上面的英語還都是簡單的英語,大致意思是這樣的:CountDownLatch裡面保存了一個count值,通過減1操作,直到為0時候,等待線程才可以執行。而且通過源碼也可以看到這個countDown方法其實是通過sync調用releaseShared(1)來完成的。

OK。到了這一步我們可能會納悶,sync是個什麼鬼,releaseShared方法又是如何實現的。我們不妨接著看源碼,在CountDownLatch的開頭我們找到了答案,原來這個sync在這裡定義了。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

在這裡我們發現繼承了AbstractQueuedSynchronizer(AQS)。AQS的其中一個作用就是維護線程狀態和獲取釋放鎖。在這裡也就是說CountDownLatch使用AQS機制維護鎖狀態。而releaseShared(1)方法就是釋放了一個共享鎖。

現在理解了吧,底層使用AQS機制調用releaseShared方法釋放一個鎖資源。那麼等待的方法是如何實現的呢?

2、await原理


public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

這倆方法都是讓線程等待,第一個沒有實現限制,第二個有時間限制,我們一個一個來看。

(1)await()


await()底層主要是acquireSharedInterruptibly方法實現的,繼續跟進去看看。

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

這裡面有兩個if語句,首先第一個判斷是否被中斷,如果被中斷了,那就拋出中斷異常。然後判斷當前是否還有線程未執行,如果有那就,那就執行doAcquireSharedInterruptibly方法繼續等待。

//這是AQS裡面的方法
//arg在這裡調用的是1,表示countDown是否減少到了0
//如果到0了,那說明滿足了要求,返回1,不再等待
//如果沒有達到0,說明還有線程未執行,必須要等到所有的線程
//執行結束才可以,返回-1,此時小於0,執行doAcquireSharedInterruptibly
protected int tryAcquireShared(int arg) {
     throw new UnsupportedOperationException();
}

上面函數的意思已經在註釋裡面了,下面我們就來看看這個doAcquireSharedInterruptibly是如何實現的。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這塊的代碼比較長,不過大致意思我可以描述一下,他會用一個一個的節點將線程串起來 等達到條件後再一個一個的喚醒。核心就是第三行的addWaiter函數。我們可以再跟進去看看吧。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

你會發現這裡面也使用了CAS機制。而且就是使用鏈表穿起來的。

(2) await(long timeout, TimeUnit unit)


這個方法的意思是等待指定的時間,如果還有線程沒執行完,那就接著執行。就好比考完試了,還有同學沒交試卷,此時因為到時間了。不管三七二十一也不管剩下的同學是否提交,直接就走了。其底層是通過Sync的tryAcquireSharedNanos方法實現的,我們接著進入到源碼中看看。

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

在這裡皮球又一次被踢走了,真正實現的其實就是doAcquireSharedNanos方法,tryAcquireShared方法主要是判斷是否當前滿足wait的條件。我們接著看。

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面的代碼看似長,最核心的就是for循環裡面的,最主要的意思就是如果當前還有線程未執行而且過了超時時間,那就直接執行等待線程就好了,不再等了。也就是我在指定的時間內你沒執行完我等著你,要是超了這個時間點我就不管了。

對於CountDownLatch來說原理主要還是通過源碼來認識。不過CountDownLatch看起來雖然很好用,也有很多不足之處,比如說CountDownLatch是一次性的 , 計數器的值只能在構造方法中初始化一次 , 之後沒有任何機制再次對其設置值,當CountDownLatch使用完畢後 , 它不能再次被使用。

OK。對其介紹就先到這裡吧。

Leave a Reply

Your email address will not be published. Required fields are marked *