開發與維運

java併發編程中的CAS機制,你理解嘛?

一、為什麼需要CAS機制?


為什麼需要CAS機制呢?我們先從一個錯誤現象談起。我們經常使用volatile關鍵字修飾某一個變量,表明這個變量是全局共享的一個變量,同時具有了可見性和有序性。但是卻沒有原子性。比如說一個常見的操作a++。這個操作其實可以細分成三個步驟:

(1)從內存中讀取a

(2)對a進行加1操作

(3)將a的值重新寫入內存中

在單線程狀態下這個操作沒有一點問題,但是在多線程中就會出現各種各樣的問題了。因為可能一個線程對a進行了加1操作,還沒來得及寫入內存,其他的線程就讀取了舊值。造成了線程的不安全現象。如何去解決這個問題呢?最常見的方式就是使用AtomicInteger來修飾a。我們可以看一下代碼:

public class Test3 {
    //使用AtomicInteger定義a
    static AtomicInteger a = new AtomicInteger();
    public static void main(String[] args) {
        Test3 test = new Test3();
        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            threads[i] = new Thread(() -> {
                try {
                    for (int j = 0; j < 10; j++) {
                        //使用getAndIncrement函數進行自增操作
                        System.out.println(a.incrementAndGet());        
                        Thread.sleep(500);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            threads[i].start();
        }
    }
}

現在我們使用AtomicInteger類並且調用了incrementAndGet方法來對a進行自增操作。這個incrementAndGet是如何實現的呢?我們可以看一下AtomicInteger的源碼。

/**
     * Atomically increments by one the current value.
     * @return the updated value
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

我們到這一步可以看到其實就是usafe調用了getAndAddInt的方法實現的,但是現在我們還看不出什麼,我們再深入到源碼中看看getAndAddInt方法又是如何實現的,

public final int getAndAddInt(Object var1, long var2, int var4) {   
    int var5;     
    do {          
        var5 = this.getIntVolatile(var1, var2);   
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));    
    return var5;   
}

到了這一步就稍微有點眉目了,原來底層調用的是compareAndSwapInt方法,這個compareAndSwapInt方法其實就是CAS機制。因此如果我們想搞清楚AtomicInteger的原子操作是如何實現的,我們就必須要把CAS機制搞清楚,這也是為什麼我們需要掌握CAS機制的原因。

二、分析CAS


1、基本含義

CAS全拼又叫做compareAndSwap,從名字上的意思就知道是比較交換的意思。比較交換什麼呢?

過程是這樣:它包含 3 個參數 CAS(V,E,N),V表示要更新變量的值,E表示預期值,N表示新值。僅當 V值等於E值時,才會將V的值設為N,如果V值和E值不同,則說明已經有其他線程做兩個更新,則當前線程則什麼都不做。最後,CAS 返回當前V的真實值。

我們舉一個我之前舉過的例子來說明這個過程:

比如說給你兒子訂婚。你兒子就是內存位置,你原本以為你兒子是和楊貴妃在一起了,結果在訂婚的時候發現兒子身邊是西施。這時候該怎麼辦呢?你一氣之下不做任何操作。如果兒子身邊是你預想的楊貴妃,你一看很開心就給他們訂婚了,也叫作執行操作。現在你應該明白了吧。

CAS 操作時抱著樂觀的態度進行的,它總是認為自己可以成功完成操作。所以CAS也叫作樂觀鎖,那什麼是悲觀鎖呢?悲觀鎖就是我們之前赫赫有名的synchronized。悲觀鎖的思想你可以這樣理解,一個線程想要去獲得這個鎖但是卻獲取不到,必須要別人釋放了才可以。

2、底層原理

想要弄清楚其底層原理,深入到源碼是最好的方式,在上面我們已經通過源碼看到了其實就是Usafe的方法來完成的,在這個方法中使用了compareAndSwapInt這個CAS機制。因此,現在我們有必要進一步深入進去看看:

public final class Unsafe {
    // compareAndSwapInt 是 native 類型的方法
    public final native boolean compareAndSwapInt(
        Object o, 
        long offset,
        int expected,
        int x
    );
    //剩餘還有很多方法
}

我們可以看到這裡面主要有四個參數,第一個參數就是我們操作的對象a,第二個參數是對象a的地址偏移量,第三個參數表示我們期待這個a是什麼值,第四個參數表示的是a的實際值。

不過這裡我們會發現這個compareAndSwapInt是一個native方法,也就是說再往下走就是C語言代碼,如果我們保持好奇心,可以繼續深入進去看看。

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, 
                                            jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  // 根據偏移量valueOffset,計算 value 的地址
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  // 調用 Atomic 中的函數 cmpxchg來進行比較交換
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

上面的代碼我們解讀一下:首先使用jint計算了value的地址,然後根據這個地址,使用了Atomic的cmpxchg方法進行比較交換。現在問題又拋給了這個cmpxchg,真實實現的是這個函數。我們再進一步深入看看,真相已經離我們不遠了。

unsigned Atomic::cmpxchg(unsigned int exchange_value,
                         volatile unsigned int* dest, 
                         unsigned int compare_value) {
    assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
  /*
   * 根據操作系統類型調用不同平臺下的重載函數,
     這個在預編譯期間編譯器會決定調用哪個平臺下的重載函數
  */
    return (unsigned int)Atomic::cmpxchg((jint)exchange_value, 
                     (volatile jint*)dest, (jint)compare_value);
}

皮球又一次被完美的踢走了,現在在不同的操作系統下會調用不同的cmpxchg重載函數,我現在用的是win10系統,所以我們看看這個平臺下的實現,彆著急再往下走走:

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, 
                            jint compare_value) {
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

這塊的代碼就有點涉及到彙編指令相關的代碼了,到這一步就徹底接近真相了,首先三個move指令表示的是將後面的值移動到前面的寄存器上。然後調用了LOCK_IF_MP和下面cmpxchg彙編指令進行了比較交換。現在我們不知道這個LOCK_IF_MP和cmpxchg是如何交換的,沒關係我們最後再深入一下。

真相來了,他來了,他真的來了。

inline jint Atomic::cmpxchg (jint exchange_value, 
                             volatile jint* dest, jint compare_value) {
  //1、 判斷是否是多核 CPU
  int mp = os::is_MP();
  __asm {
    //2、 將參數值放入寄存器中
    mov edx, dest   
    mov ecx, exchange_value
    mov eax, compare_value 
    //3、LOCK_IF_MP指令
    cmp mp, 0
    //4、 如果 mp = 0,表明線程運行在單核CPU環境下。此時 je 會跳轉到 L0 標記處,直接執行 cmpxchg 指令
    je L0
    _emit 0xF0
//5、這裡真正實現了比較交換
L0:
    /*
     * 比較並交換。簡單解釋一下下面這條指令,熟悉彙編的朋友可以略過下面的解釋:
     *   cmpxchg: 即“比較並交換”指令
     *   dword: 全稱是 double word 表示兩個字,一共四個字節
     *   ptr: 全稱是 pointer,與前面的 dword 連起來使用,表明訪問的內存單元是一個雙字單元 
     * 這一條指令的意思就是:
            將 eax 寄存器中的值(compare_value)與 [edx] 雙字內存單元中的值進行對比,
            如果相同,則將 ecx 寄存器中的值(exchange_value)存入 [edx] 內存單元中。
     */
    cmpxchg dword ptr [edx], ecx
  }
}

到這一步了,相信你應該理解了這個CAS真正實現的機制了吧,最終是由操作系統的彙編指令完成的。

3、CAS機制的優缺點


(1)優點


一開始在文中我們曾經提到過,cas是一種樂觀鎖,而且是一種非阻塞的輕量級的樂觀鎖,什麼是非阻塞式的呢?其實就是一個線程想要獲得鎖,對方會給一個迴應表示這個鎖能不能獲得。在資源競爭不激烈的情況下性能高,相比synchronized重量鎖,synchronized會進行比較複雜的加鎖,解鎖和喚醒操作。

(2)缺點


缺點也是一個非常重要的知識點,因為涉及到了一個非常著名的問題,叫做ABA問題。假設一個變量 A ,修改為 B之後又修改為 A,CAS 的機制是無法察覺的,但實際上已經被修改過了。這就是ABA問題,

ABA問題會帶來大量的問題,比如說數據不一致的問題等等。我們可以舉一個例子來解釋說明。

你有一瓶水放在桌子上,別人把這瓶水喝完了,然後重新倒上去。你再去喝的時候發現水還是跟之前一樣,就誤以為是剛剛那杯水。如果你知道了真相,那是別人用過了你還會再用嘛?舉一個比較黃一點的例子,女朋友被別人睡過之後又回來,還是之前的那個女朋友嘛?

ABA可以有很多種方式來解決,我們在後續的文章中再進行敘述和討論。

Leave a Reply

Your email address will not be published. Required fields are marked *