一、定義
CountDownLatch的作用很簡單,就是一個或者一組線程在開始執行操作之前,必須要等到其他線程執行完才可以。我們舉一個例子來說明,在考試的時候,老師必須要等到所有人交了試卷才可以走。此時老師就相當於等待線程,而學生就好比是執行的線程。
注意:java中還有一個同步工具類叫做CyclicBarrier,他的作用和CountDownLatch類似。同樣是等待其他線程都完成了,才可以進行下一步操作,我們再舉一個例子,在打王者的時候,在開局前所有人都必須要加載到100%才可以進入。否則所有玩家都相互等待。
我們看一下區別:CountDownLatch: 一個線程(或者多個), 等待另外N個線程完成某個事情之後才能執行。 CyclicBarrier : N個線程相互等待,任何一個線程完成之前,所有的線程都必須等待。關鍵點其實就在於那N個線程(1)CountDownLatch裡面N個線程就是學生,學生做完了試卷就可以走了,不用等待其他的學生是否完成(2)CyclicBarrier 裡面N個線程就是所有的遊戲玩家,一個遊戲玩家加載到100%還不可以,必須要等到其他的遊戲玩家都加載到100%才可以開局
現在應該理解CountDownLatch的含義了吧,下面我們使用一個代碼案例來解釋。
二、使用
我們使用學生考試的案例來進行演示:
public class CountDownLatchTest { static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) { System.out.println("全班同學開始考試:一共兩個學生"); new Thread(() -> { System.out.println("第一個學生交卷,countDownLatch減1"); countDownLatch.countDown(); }).start(); new Thread(() -> { System.out.println("第二個學生交卷,countDownLatch減1"); countDownLatch.countDown(); }).start(); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("老師清點試卷,在此之前,只要一個學生沒交," + "countDownLatch不為0,不能離開考場"); } }
在上面,我們定義了一個CountDownLatch,並設置其值為2。有兩個學生使用兩個線程來表示,然後依次執行。最後老師線程(main線程)在學生線程都執行完了才可以執行。我們來運行一邊看看結果。
現在我們應該能體會到其用法了吧。在上面我們的等待線程時老師(main線程)。
下面我們對這個countDownLatch分析一下。為什麼具有上面的特點。
三、原理
在上面我們看到,CountDownLatch主要使用countDown方法進行減1操作,使用await方法進行等到操作。我們進入到源碼中看看。本源碼基於jdk1.8。特在此說明。
1、countDown原理
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); }
英語不好的人看起來真的是一臉懵逼,不過信號上面的英語還都是簡單的英語,大致意思是這樣的:CountDownLatch裡面保存了一個count值,通過減1操作,直到為0時候,等待線程才可以執行。而且通過源碼也可以看到這個countDown方法其實是通過sync調用releaseShared(1)來完成的。
OK。到了這一步我們可能會納悶,sync是個什麼鬼,releaseShared方法又是如何實現的。我們不妨接著看源碼,在CountDownLatch的開頭我們找到了答案,原來這個sync在這裡定義了。
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
在這裡我們發現繼承了AbstractQueuedSynchronizer(AQS)。AQS的其中一個作用就是維護線程狀態和獲取釋放鎖。在這裡也就是說CountDownLatch使用AQS機制維護鎖狀態。而releaseShared(1)方法就是釋放了一個共享鎖。
現在理解了吧,底層使用AQS機制調用releaseShared方法釋放一個鎖資源。那麼等待的方法是如何實現的呢?
2、await原理
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
這倆方法都是讓線程等待,第一個沒有實現限制,第二個有時間限制,我們一個一個來看。
(1)await()
await()底層主要是acquireSharedInterruptibly方法實現的,繼續跟進去看看。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
這裡面有兩個if語句,首先第一個判斷是否被中斷,如果被中斷了,那就拋出中斷異常。然後判斷當前是否還有線程未執行,如果有那就,那就執行doAcquireSharedInterruptibly方法繼續等待。
//這是AQS裡面的方法 //arg在這裡調用的是1,表示countDown是否減少到了0 //如果到0了,那說明滿足了要求,返回1,不再等待 //如果沒有達到0,說明還有線程未執行,必須要等到所有的線程 //執行結束才可以,返回-1,此時小於0,執行doAcquireSharedInterruptibly protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
上面函數的意思已經在註釋裡面了,下面我們就來看看這個doAcquireSharedInterruptibly是如何實現的。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
這塊的代碼比較長,不過大致意思我可以描述一下,他會用一個一個的節點將線程串起來 等達到條件後再一個一個的喚醒。核心就是第三行的addWaiter函數。我們可以再跟進去看看吧。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
你會發現這裡面也使用了CAS機制。而且就是使用鏈表穿起來的。
(2) await(long timeout, TimeUnit unit)
這個方法的意思是等待指定的時間,如果還有線程沒執行完,那就接著執行。就好比考完試了,還有同學沒交試卷,此時因為到時間了。不管三七二十一也不管剩下的同學是否提交,直接就走了。其底層是通過Sync的tryAcquireSharedNanos方法實現的,我們接著進入到源碼中看看。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
在這裡皮球又一次被踢走了,真正實現的其實就是doAcquireSharedNanos方法,tryAcquireShared方法主要是判斷是否當前滿足wait的條件。我們接著看。
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上面的代碼看似長,最核心的就是for循環裡面的,最主要的意思就是如果當前還有線程未執行而且過了超時時間,那就直接執行等待線程就好了,不再等了。也就是我在指定的時間內你沒執行完我等著你,要是超了這個時間點我就不管了。
對於CountDownLatch來說原理主要還是通過源碼來認識。不過CountDownLatch看起來雖然很好用,也有很多不足之處,比如說CountDownLatch是一次性的 , 計數器的值只能在構造方法中初始化一次 , 之後沒有任何機制再次對其設置值,當CountDownLatch使用完畢後 , 它不能再次被使用。
OK。對其介紹就先到這裡吧。