開發與維運

【併發容器精講三、】併發隊列Queue

@[toc]

1. 為什麼要使用隊列

  • 用隊列可以在線程間傳遞數據:生產者,消費者模式,銀行轉帳
  • 考慮鎖等線程安全問題重任 轉移到隊列上

2. 併發隊列簡介

簡單介紹各個併發併發隊列的關係,併發隊列是指線程安全的隊列,包含:阻塞隊列和非阻塞隊列,區別如下。

  • 阻塞隊列:滿了之後不能再插入,當隊列為空的時候,讀不到會阻塞
  • 非阻塞隊列:和阻塞隊列完全不一樣的

3. 各併發隊列關係圖

在這裡插入圖片描述
彩蛋 IDEA 生成UML

在這裡插入圖片描述
會出現 UML 圖
在這裡插入圖片描述

4. 阻塞隊列 BlockingQueue

1.什麼是阻塞隊列
: 阻塞隊列是具有阻塞功能到隊列,所以他首先是一個隊列,其次他具有阻塞功能
: 通常, 阻塞隊列的一端是給生產者放數據用的,另一端給消費者那數據用的。阻塞隊列是線程安全的所以 生產者 消費者都可以是多線程的
: 阻塞功能:最有特色的兩個帶有阻塞功能方法是

  • task()方法,獲取並移除隊列的頭節點,一旦執行task 任務的時候,隊列裡無數據 則阻塞
  • put() 方法 插入元素,但是如果隊列已滿,那就無法繼續插入,則阻塞,直到有空閒的空間
  • 我們需要考慮的是是否有界(容量有多大):這是一個非常重要的屬性,無界隊列意味著可以容納非常多的一個數 Integer.MAX_VALUE)約為2的31次
  • 阻塞隊列 和 線程池的關係:阻塞隊列是線程池重要組成部分

2.主要方法介紹
: put 、 take
: add、 remove、 element
: offer 、pull 、peek

ArrayBlockingQueue
: 有界
: 指定容量
: 公平 :還可以指定是否公平,如果指定公平,那麼等待時間較長的線程會被優先處理,不過會帶來一定的性能消耗
案例 :10個人面試,一個面試官,3個位置可以休息,每個人面試時間是10秒,模擬所有人面試場景

 package com.yxl.task;


import lombok.SneakyThrows;
import org.omg.PortableInterceptor.INACTIVE;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

public class ArrayBlockingQueueDemo {


    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue= new ArrayBlockingQueue<String>(3);
        Interviewer interviewer = new Interviewer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(interviewer).start();
        new Thread(consumer).start();

    }



}


class Interviewer implements  Runnable{

    ArrayBlockingQueue<String> blockingDeque;

    public Interviewer(ArrayBlockingQueue blockingDeque) {
        this.blockingDeque = blockingDeque;
    }

    @Override
    public void run() {
        System.out.println("10個候選人");
        for (int i = 0; i < 10 ; i++) {
            try {
                blockingDeque.put("wo"+i);
                System.out.println("安排好了"+ i);
            }catch (Exception e){
                e.getMessage();
            }
        }
        try {
            blockingDeque.put("stop");
        }catch (Exception e){
            e.getMessage();
        }

    }
}


class Consumer implements  Runnable{

    ArrayBlockingQueue<String>  blockingDeque;

    public Consumer(ArrayBlockingQueue blockingDeque) {
        this.blockingDeque = blockingDeque;
    }

    @SneakyThrows
    @Override
    public void run() {
        Thread.sleep(1000);
        String take ;
        while (!(take = blockingDeque.take()).equals("stop")){
            System.out.println("打印"+take);
        }
            System.out.println("結束 ");
    }
}

運行結果
在這裡插入圖片描述
LinkedBlockingQueue
: 無界
: 容量可以達到 Integet.MAX_VALUE
: 內部結果 Node 、兩把鎖、分析put 方法

源碼

 /**
     * Linked list node class
     */
     //有一個一個的Node
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

兩把鎖

//lock 鎖 take 和  poll的鎖
 /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

//put 和 offer 的鎖
 /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

put() 方法 源碼

/**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
// 判斷數據是否為空
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
       //將數據放入Node中
        Node<E> node = new Node<E>(e);
        
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //加鎖
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
             //如果滿了 等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //否則存隊列
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

PriorityBlockingQueue
: 支持優先級
: 自然順序 (不是先進先出的)
: 無界隊列
: PriorityQueue 的一個安全的版本

SynchronusQueue
: 他的容量為0
: SynchronusQueue的容量不是1 而是0,因為他不需要去持有元素,他是直接傳遞的
: 效率很高
: 是一個很好的用來直接傳遞的併發數據結構

在這裡插入圖片描述

5. 非阻塞隊列

  • 併發包中的非阻塞隊列只有ConcurrentLinkedQueue這一種,顧名思義ConcurrentLinkedQueue是使用鏈表作為其數據結構的,使用CAS非阻塞算法實現線程安全(不具備阻塞功能)適用於性能比較高的要求

源碼

 /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
// for 死循環 做cas
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                // cas 
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }



       // 運用 cas算法   compareAndSwapObject
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

6. 如何選擇適合的隊列

從這幾個方面選擇

  • 邊界
  • 空間
  • 吞吐量

個人博客地址:http://blog.yxl520.cn/

Leave a Reply

Your email address will not be published. Required fields are marked *