開發與維運

Java進階之併發編程——《我的Java打怪日記》

1. 概述

  • 三種性質
    • 可見性:一個線程對共享變量的修改,另一個線程能立刻看到。緩存可導致可見性問題。
    • 原子性:一個或多個CPU執行操作不被中斷。線程切換可導致原子性問題。
    • 有序性:編譯器優化可能導致指令順序發生改變。編譯器優化可能導致有序性問題。
  • 三個問題
    • 安全性問題:線程安全
    • 活躍性問題:死鎖、活鎖、飢餓
    • 性能問題
      • 使用無鎖結構:TLS,Copy-On-Write,樂觀鎖;Java的原子類,Disruptor無鎖隊列
      • 減少鎖的持有時間:讓鎖細粒度。如ConcurrentHashmap;再如讀寫鎖,讀無鎖寫有鎖

2. Java內存模型

  • volatile
    • C語言中的原意:禁用CPU緩存,從內存中讀出和寫入。
    • Java語言的引申義
      • Java會將變量立刻寫入內存,其他線程讀取時直接從內存讀(普通變量改變後,什麼時候寫入內存是不一定的)
      • 禁止指令重排序
    • 解決問題
      • 保證可見性
      • 保證有序性
      • 不能保證原子性
  • Happens-Before規則(H-B)
    • 程序順序性規則:前面執行的語句對後面語句可見
    • volatile變量規則:volatile變量的寫操作對後續的讀操作可見
    • 傳遞性規則:A H-B B,B H-B C,那麼A H-B C
    • 管程中鎖的規則:對一個鎖的解鎖 H-B於 後續對這個鎖的加鎖

3. 互斥鎖sychronized

  • 鎖對象:非靜態this,靜態Class,括號Object參數
  • 預防死鎖:
    • 互斥:不能破壞
    • 佔有且等待:同時申請所有資源
    • 不可搶佔:sychronized解決不了,Lock可以解決
    • 循環等待:給資源設置id字段,每次都是按順序申請鎖
  • 等待通知機制
    • wait、notify、notifyAll
class Allocator {
  private List<Object> als;
  // 一次性申請所有資源
  synchronized void apply(
    Object from, Object to){
    // 經典寫法
    while(als.contains(from) ||
         als.contains(to)){
      try{
        wait();
      }catch(Exception e){
      }   
    } 
    als.add(from);
    als.add(to);  
  }
  // 歸還資源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
    notifyAll();
  }
}

4. 線程的生命週期

  • 通用線程的生命週期
  • 1.png
  • Java線程的生命週期
  • 2.png
  • 狀態流轉
    • RUNNABLE -- BLOCKED:線程獲取和等待sychronized隱式鎖
      • ps:調用阻塞式API時,不會進入BLOCKED狀態,但對於操作系統而言,線程實際上進入了休眠態,只不過JVM不關心。
    • RUNNABLE -- WAITING
      • Object.wait()
      • Thread.join()
      • LockSupport.park()
    • RUNNABLE -- TIMED-WAITING:調用各種帶超時參數的線程方法
    • NEW -- RUNNABLE:Thread.start()
    • RUNNABLE -- TERMINATED:線程運行完畢,有異常拋出,或手動調用線程stop()

6. 線程的性能指標

  • 延遲:發出請求到收到響應
  • 吞吐量:單位時間內處理的請求數量
  • 最佳線程數:
    • CPU密集型:線程數 = CPU核數 + 1
    • IO密集型:線程數 = (IO耗時/CPU耗時 + 1)* CPU核數
    • 3.png

7. JDK併發包

  • Lock:lock、unlock
    • 互斥鎖,和sychronized一樣的功能,裡面能保證可見性
  • Condition:await、signal
    • 條件,相比於sychronized的Object.wait,Condition可以實現多條件喚醒等待機制
  • Semaphore:acquire、release
    • 信號量,可以用來實現多個線程訪問一個臨界區,如實現對象池設計中的限流器
  • ReadWriteLock:readLock、writeLock
    • 寫鎖、讀鎖,允許多線程讀,一個線程寫,寫鎖持有時所有讀鎖和寫鎖的獲取都阻塞(寫鎖的獲取要等所有讀寫鎖釋放)
    • 適用於讀多寫少的場景
  • StampedLock:tryOptimisticRead、validate
    • 寫鎖、讀鎖(分悲觀讀鎖、樂觀讀鎖):
  • 線程同步:
    • CountDownLatch:一個線程等待多個線程
      • 初始化 --> countDown(減1) --> await(等待為0)
    • CyclicBarrier:一組線程之間相互等待
      • 初始化 --> 設置回調函數(為0時執行,並返回原始值) -->  await(減1並等待為0)
  • 併發容器:
    • List:
      • CopyOnWriteArrayList:適用寫少的場景,要容忍可能的讀不一致
    • Map:
      • ConcurrentHashMap:分段鎖
      • ConcurrentSkipListMap:跳錶
    • Set:
      • CopyOnWriteArraySet:同上
      • ConcurrentSkipListSet:同上
    • Queue
      • 分類:阻塞Blocking、單端Queue、雙端Deque
      • 單端阻塞(BlockingQueue):Array~、Linked~、Sychronized~、LinkedTransfer~、Priority~、Delay~
      • 雙端阻塞(BlockingDeque):Linked~
      • 單端非阻塞(Queue):ConcurrentLinked~
      • 雙端非阻塞(Deque):ConcurrentLinked~
  • 原子類:
    • 無鎖方案原理:增加了硬件支持,即CPU的CAS指令
    • ABA問題:有解決ABA問題的需求時,增加一個遞增的版本號緯度化解
    • 分類:原子化基本數據類型,原子化引用類型、原子化數組、原子化對象屬性更新器、原子化累加器
  • Future:
    • Future:cancel、isCanceled、isDone、get
    • FutureTask:實現了Runnable和Future接口
  • 強大工具類
    • CompletableFuture:一個強大的異步編程工具類(任務之間有聚合關係),暫時略
    • CompletionService:批量並行任務,暫時略

8. 線程池

  • 設計原理:
    • 生產者消費者模型,線程池是消費者,調用者是生產者。
    • 線程池對象裡維護一個阻塞隊列,一個已經跑起來的工作線程組ThreadsList
    • ThreadList裡面循環從隊列中去Runnable任務,並調用run方法
// 簡化的線程池,僅用來說明工作原理
class MyThreadPool{
  // 利用阻塞隊列實現生產者 - 消費者模式
  BlockingQueue<Runnable> workQueue;
  // 保存內部工作線程
  List<WorkerThread> threads
    = new ArrayList<>();
  // 構造方法
  MyThreadPool(int poolSize,
    BlockingQueue<Runnable> workQueue){
    this.workQueue = workQueue;
    // 創建工作線程
    for(int idx=0; idx<poolSize; idx++){
      WorkerThread work = new WorkerThread();
      work.start();
      threads.add(work);
    }
  }
  // 提交任務
  void execute(Runnable command){
    workQueue.put(command);
  }
  // 工作線程負責消費任務,並執行任務
  class WorkerThread extends Thread{
    public void run() {
      // 循環取任務並執行
      while(true){ 
        Runnable task = workQueue.take();
        task.run();
      }
    }
  }
}

/** 下面是使用示例 **/
// 創建有界阻塞隊列
BlockingQueue<Runnable> workQueue =
  new LinkedBlockingQueue<>(2);
// 創建線程池
MyThreadPool pool = new MyThreadPool(
  10, workQueue);
// 提交任務
pool.execute(()->{
    System.out.println("hello");
});

  • ThreadPoolExcutor
    • 參數
      • corePoolSize:線程池保有的最小線程數
      • maximumPoolSize:線程池創建的最大線程數
      • keepAliveTime:工作線程多久沒收到任務,被認為是閒的
      • workQueue:工作隊列
      • threadFactory:通過這個參數自定義如何創建線程
      • handler:任務拒絕策略
        • 默認為AbortPolicy,會拋出RejectedExecutionException,這是個運行時異常,要注意
    • 方法
      • void execute()
      • Future submit(Runnable task | Callable task)

9. 鳥瞰並行任務分類

4.png

Leave a Reply

Your email address will not be published. Required fields are marked *