這些Java并發(fā)容器,你都了解嗎?
前言
在多線程環(huán)境下,數(shù)據(jù)的并發(fā)訪問和修改是無法避免的問題。
為了解決這個(gè)問題,Java 提供了一系列并發(fā)容器,這些容器在內(nèi)部已經(jīng)處理了并發(fā)問題,使得我們可以在多線程環(huán)境下安全地訪問和修改數(shù)據(jù)。
并發(fā)容器
1.ConcurrentHashMap 并發(fā)版 HashMap
最常見的并發(fā)容器之一,可以用作并發(fā)場(chǎng)景下的緩存。底層依然是哈希表,但在 JAVA 8 中有了不小的改變,而 JAVA 7 和 JAVA 8 都是用的比較多的版本,因此經(jīng)常會(huì)將這兩個(gè)版本的實(shí)現(xiàn)方式做一些比較(比如面試中)。
一個(gè)比較大的差異就是,JAVA 7 中采用分段鎖來減少鎖的競(jìng)爭(zhēng),JAVA 8 中放棄了分段鎖,采用 CAS(一種樂觀鎖),同時(shí)為了防止哈希沖突嚴(yán)重時(shí)退化成鏈表(沖突時(shí)會(huì)在該位置生成一個(gè)鏈表,哈希值相同的對(duì)象就鏈在一起),會(huì)在鏈表長(zhǎng)度達(dá)到閾值(8)后轉(zhuǎn)換成紅黑樹(比起鏈表,樹的查詢效率更穩(wěn)定)。
示例
import java.util.concurrent.*;
public class ConcurrentHashMapExample {
  public static void main(String[] args) {
      // Creating a ConcurrentHashMap
      ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();
      // Adding elements to the ConcurrentHashMap
      map.put("Key1", "Value1");
      map.put("Key2", "Value2");
      map.put("Key3", "Value3");
      // Printing the ConcurrentHashMap
      System.out.println("ConcurrentHashMap: " + map);
  }
}2.CopyOnWriteArrayList 并發(fā)版 ArrayList
并發(fā)版 ArrayList,底層結(jié)構(gòu)也是數(shù)組,和 ArrayList 不同之處在于:當(dāng)新增和刪除元素時(shí)會(huì)創(chuàng)建一個(gè)新的數(shù)組,在新的數(shù)組中增加或者排除指定對(duì)象,最后用新增數(shù)組替換原來的數(shù)組。
CopyOnWriteArrayList 的主要特性是,每當(dāng)列表修改時(shí),例如添加或刪除元素,它都會(huì)創(chuàng)建列表的一個(gè)新副本。原始列表和新副本都可以進(jìn)行并發(fā)讀取,這樣就可以在不鎖定整個(gè)列表的情況下進(jìn)行并發(fā)讀取。這種方法在讀取操作遠(yuǎn)多于寫入操作的場(chǎng)景中非常有用。
適用場(chǎng)景:由于讀操作不加鎖,寫(增、刪、改)操作加鎖,因此適用于讀多寫少的場(chǎng)景。
局限:由于讀的時(shí)候不會(huì)加鎖(讀的效率高,就和普通 ArrayList 一樣),讀取的當(dāng)前副本,因此可能讀取到臟數(shù)據(jù)。如果介意,建議不用。
看看源碼感受下:
圖片
示例
import java.util.concurrent.*;
public class CopyOnWriteArrayListExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) CopyOnWriteArrayList
      CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
      // 向 CopyOnWriteArrayList 添加元素
      list.add("Element1");
      list.add("Element2");
      list.add("Element3");
      // 打印 CopyOnWriteArrayList
      System.out.println("CopyOnWriteArrayList: " + list);
  }
}3.CopyOnWriteArraySet 并發(fā) Set
基于 CopyOnWriteArrayList 實(shí)現(xiàn)(內(nèi)含一個(gè) CopyOnWriteArrayList 成員變量),也就是說底層是一個(gè)數(shù)組,意味著每次 add 都要遍歷整個(gè)集合才能知道是否存在,不存在時(shí)需要插入(加鎖)。
CopyOnWriteArraySet 的工作原理與 CopyOnWriteArrayList 類似。每當(dāng)發(fā)生修改操作(如添加或刪除元素)時(shí),它都會(huì)創(chuàng)建集合的一個(gè)新副本。原始集合和新副本都可以進(jìn)行并發(fā)讀取,這樣就可以在不鎖定整個(gè)集合的情況下進(jìn)行并發(fā)讀取。這種方法在讀取操作遠(yuǎn)多于寫入操作的場(chǎng)景中非常有用。
適用場(chǎng)景:在 CopyOnWriteArrayList 適用場(chǎng)景下加一個(gè),集合別太大(全部遍歷傷不起)。
示例
import java.util.concurrent.*;
public class CopyOnWriteArraySetExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) CopyOnWriteArraySet
      CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<String>();
      // 向 CopyOnWriteArraySet 添加元素
      set.add("Element1");
      set.add("Element2");
      set.add("Element3");
      // 打印 CopyOnWriteArraySet
      System.out.println("CopyOnWriteArraySet: " + set);
  }
}4.ConcurrentLinkedQueue 并發(fā)隊(duì)列 (基于鏈表)
基于鏈表實(shí)現(xiàn)的并發(fā)隊(duì)列,使用樂觀鎖 (CAS) 保證線程安全。因?yàn)閿?shù)據(jù)結(jié)構(gòu)是鏈表,所以理論上是沒有隊(duì)列大小限制的,也就是說添加數(shù)據(jù)一定能成功。
ConcurrentLinkedQueue 是 Java 并發(fā)包的一部分,它是基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列。它按照 FIFO(先進(jìn)先出)的原則對(duì)元素進(jìn)行排序。
ConcurrentLinkedQueue 的主要優(yōu)點(diǎn)是它允許完全并發(fā)的插入,并且使用了一種高效的“wait-free”算法。
示例
import java.util.concurrent.*;
public class ConcurrentLinkedQueueExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) ConcurrentLinkedQueue
      ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
      // 向 ConcurrentLinkedQueue 添加元素
      queue.add("Element1");
      queue.add("Element2");
      queue.add("Element3");
      // 打印 ConcurrentLinkedQueue
      System.out.println("ConcurrentLinkedQueue: " + queue);
  }
}5.ConcurrentLinkedDeque 并發(fā)隊(duì)列 (基于雙向鏈表)
基于雙向鏈表實(shí)現(xiàn)的并發(fā)隊(duì)列,可以分別對(duì)頭尾進(jìn)行操作,因此除了先進(jìn)先出 (FIFO),也可以先進(jìn)后出(FILO),當(dāng)然先進(jìn)后出的話應(yīng)該叫它棧了。
ConcurrentLinkedDeque 是 Java 并發(fā)包的一部分,它是一個(gè)基于鏈接節(jié)點(diǎn)的無界并發(fā)雙端隊(duì)列。在 ConcurrentLinkedDeque 中,添加、刪除等操作可以在隊(duì)列的兩端進(jìn)行,使其具有更高的并發(fā)性。
示例
import java.util.concurrent.*;
public class ConcurrentLinkedDequeExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) ConcurrentLinkedDeque
      ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<String>();
      // 向 ConcurrentLinkedDeque 添加元素
      deque.add("Element1");
      deque.addFirst("Element2");
      deque.addLast("Element3");
      // 打印 ConcurrentLinkedDeque
      System.out.println("ConcurrentLinkedDeque: " + deque);
  }
}6.ConcurrentSkipListMap 基于跳表的并發(fā) Map
ConcurrentSkipListMap 是 Java 并發(fā)包的一部分,它是一個(gè)線程安全的排序映射表。它使用跳表的數(shù)據(jù)結(jié)構(gòu)來保證元素的有序性和并發(fā)性。
跳表是一種可以進(jìn)行二分查找的有序鏈表。ConcurrentSkipListMap 提供了預(yù)期的平均 log(n) 時(shí)間成本來執(zhí)行 containsKey,get,put 和 remove 操作,并且它的并發(fā)性通常優(yōu)于基于樹的算法。
SkipList 即跳表,跳表是一種空間換時(shí)間的數(shù)據(jù)結(jié)構(gòu),通過冗余數(shù)據(jù),將鏈表一層一層索引,達(dá)到類似二分查找的效果
圖片
示例
import java.util.concurrent.*;
public class ConcurrentSkipListMapExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) ConcurrentSkipListMap
      ConcurrentSkipListMap<String, String> map = new ConcurrentSkipListMap<String, String>();
      // 向 ConcurrentSkipListMap 添加元素
      map.put("Key1", "Value1");
      map.put("Key2", "Value2");
      map.put("Key3", "Value3");
      // 打印 ConcurrentSkipListMap
      System.out.println("ConcurrentSkipListMap: " + map);
  }
}7.ConcurrentSkipListSet 基于跳表的并發(fā) Set
類似 HashSet 和 HashMap 的關(guān)系,ConcurrentSkipListSet 里面就是一個(gè) ConcurrentSkipListMap,
ConcurrentSkipListSet 是 Java 并發(fā)包的一部分,它是一個(gè)線程安全的排序集合。它使用跳表的數(shù)據(jù)結(jié)構(gòu)來保證元素的有序性和并發(fā)性。
跳表是一種可以進(jìn)行二分查找的有序鏈表。ConcurrentSkipListSet 提供了預(yù)期的平均 log(n) 時(shí)間成本來執(zhí)行 contains,add 和 remove 操作,并且它的并發(fā)性通常優(yōu)于基于樹的算法。
示例
import java.util.concurrent.*;
public class ConcurrentSkipListSetExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) ConcurrentSkipListSet
      ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<String>();
      // 向 ConcurrentSkipListSet 添加元素
      set.add("Element1");
      set.add("Element2");
      set.add("Element3");
      // 打印 ConcurrentSkipListSet
      System.out.println("ConcurrentSkipListSet: " + set);
  }
}8.ArrayBlockingQueue 阻塞隊(duì)列 (基于數(shù)組)
ArrayBlockingQueue 是 Java 并發(fā)包的一部分,它是一個(gè)基于數(shù)組的有界阻塞隊(duì)列。此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。
ArrayBlockingQueue 在嘗試插入元素到已滿隊(duì)列或從空隊(duì)列中移除元素時(shí),會(huì)導(dǎo)致線程阻塞,直到有空間或元素可用。
基于數(shù)組實(shí)現(xiàn)的可阻塞隊(duì)列,構(gòu)造時(shí)必須制定數(shù)組大小,往里面放東西時(shí)如果數(shù)組滿了便會(huì)阻塞直到有位置(也支持直接返回和超時(shí)等待),通過一個(gè)鎖 ReentrantLock 保證線程安全。
圖片
乍一看會(huì)有點(diǎn)疑惑,讀和寫都是同一個(gè)鎖,那要是空的時(shí)候正好一個(gè)讀線程來了不會(huì)一直阻塞嗎?
答案就在 notEmpty、notFull 里,這兩個(gè)出自 lock 的小東西讓鎖有了類似 synchronized + wait + notify 的功能。傳送門 → 終于搞懂了 sleep/wait/notify/notifyAll
示例
import java.util.concurrent.*;
public class ArrayBlockingQueueExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) ArrayBlockingQueue
      ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
      // 向 ArrayBlockingQueue 添加元素
      try {
          queue.put("Element1");
          queue.put("Element2");
          queue.put("Element3");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      // 打印 ArrayBlockingQueue
      System.out.println("ArrayBlockingQueue: " + queue);
  }
}9.LinkedBlockingQueue 阻塞隊(duì)列 (基于鏈表)
LinkedBlockingQueue 是 Java 并發(fā)包的一部分,它是一個(gè)基于鏈表的可選有界阻塞隊(duì)列。此隊(duì)列按照 FIFO(先進(jìn)先出)的原則對(duì)元素進(jìn)行排序。
LinkedBlockingQueue 在嘗試插入元素到已滿隊(duì)列或從空隊(duì)列中移除元素時(shí),會(huì)導(dǎo)致線程阻塞,直到有空間或元素可用。
基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列,想比與不阻塞的 ConcurrentLinkedQueue,它多了一個(gè)容量限制,如果不設(shè)置默認(rèn)為 int 最大值。
示例
import java.util.concurrent.*;
public class LinkedBlockingQueueExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) LinkedBlockingQueue
      LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
      // 向 LinkedBlockingQueue 添加元素
      try {
          queue.put("Element1");
          queue.put("Element2");
          queue.put("Element3");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      // 打印 LinkedBlockingQueue
      System.out.println("LinkedBlockingQueue: " + queue);
  }
}10.LinkedBlockingDeque 阻塞隊(duì)列 (基于雙向鏈表)
LinkedBlockingDeque 是 Java 并發(fā)包的一部分,它是一個(gè)基于鏈表的可選有界阻塞雙端隊(duì)列。此隊(duì)列按照 FIFO(先進(jìn)先出)的原則對(duì)元素進(jìn)行排序。
LinkedBlockingDeque 在嘗試插入元素到已滿隊(duì)列或從空隊(duì)列中移除元素時(shí),會(huì)導(dǎo)致線程阻塞,直到有空間或元素可用。雙端隊(duì)列的優(yōu)勢(shì)在于可以從兩端插入或移除元素。
類似 LinkedBlockingQueue,但提供了雙向鏈表特有的操作。
示例
import java.util.concurrent.*;
public class LinkedBlockingDequeExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) LinkedBlockingDeque
      LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<String>(3);
      // 向 LinkedBlockingDeque 添加元素
      try {
          deque.putFirst("Element1");
          deque.putLast("Element2");
          deque.putFirst("Element3");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      // 打印 LinkedBlockingDeque
      System.out.println("LinkedBlockingDeque: " + deque);
  }
}11.PriorityBlockingQueue 線程安全的優(yōu)先隊(duì)列
PriorityBlockingQueue 是 Java 并發(fā)包的一部分,它是一個(gè)無界的并發(fā)隊(duì)列。它使用了和類 java.util.PriorityQueue 一樣的排序規(guī)則,并且能夠確保在并發(fā)環(huán)境下的線程安全。
PriorityBlockingQueue 中的元素按照自然順序或者由比較器提供的順序進(jìn)行排序。隊(duì)列不允許使用 null 元素。
構(gòu)造時(shí)可以傳入一個(gè)比較器,可以看做放進(jìn)去的元素會(huì)被排序,然后讀取的時(shí)候按順序消費(fèi)。某些低優(yōu)先級(jí)的元素可能長(zhǎng)期無法被消費(fèi),因?yàn)椴粩嘤懈邇?yōu)先級(jí)的元素進(jìn)來。
示例
import java.util.concurrent.*;
public class PriorityBlockingQueueExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) PriorityBlockingQueue
      PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<String>();
      // 向 PriorityBlockingQueue 添加元素
      queue.add("Element1");
      queue.add("Element2");
      queue.add("Element3");
      // 打印 PriorityBlockingQueue
      System.out.println("PriorityBlockingQueue: " + queue);
  }
}12.SynchronousQueue 數(shù)據(jù)同步交換的隊(duì)列
SynchronousQueue 是 Java 并發(fā)包的一部分,它是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每一個(gè) put 操作必須等待一個(gè) take 操作,否則不能繼續(xù)添加元素,反之亦然。
這種特性使 SynchronousQueue 成為線程之間傳遞數(shù)據(jù)的好工具。它可以看作是一個(gè)傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程。
一個(gè)虛假的隊(duì)列,因?yàn)樗鼘?shí)際上沒有真正用于存儲(chǔ)元素的空間,每個(gè)插入操作都必須有對(duì)應(yīng)的取出操作,沒取出時(shí)無法繼續(xù)放入。
示例
import java.util.concurrent.SynchronousQueue;
public class Main {
   
  public static void main(String[] args) {
      SynchronousQueue<Integer> queue = new SynchronousQueue<>();
      new Thread(()->{
          try{
              for(int i=0;;i++){
                  System.out.println("放入:" + i);
                  queue.put(i);
              }
          }catch (InterruptedException e){
              e.printStackTrace();
          }
      }).start();
      new Thread(()->{
          try{
              while(true){
                  System.out.println("取出:" + queue.take());
                  Thread.sleep((long)(Math.random()*2000));
              }
          }catch (InterruptedException e){
              e.printStackTrace();
          }
      }).start();
  }
}運(yùn)行結(jié)果:
取出:0
放入:0
取出:1
放入:1
放入:2
取出:2
取出:3
放入:3
取出:4
放入:4
...
...可以看到,寫入的線程沒有任何 sleep,可以說是全力往隊(duì)列放東西,而讀取的線程又很不積極,讀一個(gè)又 sleep 一會(huì)。輸出的結(jié)果卻是讀寫操作成對(duì)出現(xiàn)。
JAVA 中一個(gè)使用場(chǎng)景就是 Executors.newCachedThreadPool(),創(chuàng)建一個(gè)緩存線程池。
圖片
13.LinkedTransferQueue 基于鏈表的數(shù)據(jù)交換隊(duì)列
LinkedTransferQueue 是 Java 并發(fā)包的一部分,它是一個(gè)由鏈表結(jié)構(gòu)組成的無界轉(zhuǎn)移阻塞隊(duì)列。隊(duì)列按照 FIFO(先進(jìn)先出)的原則對(duì)元素進(jìn)行排序。
LinkedTransferQueue 的一個(gè)特性是,它可以嘗試將元素直接轉(zhuǎn)移給消費(fèi)者,如果沒有等待的消費(fèi)者,元素就會(huì)被添加到隊(duì)列的尾部,等待消費(fèi)者來獲取。
實(shí)現(xiàn)了接口 TransferQueue,通過 transfer 方法放入元素時(shí),如果發(fā)現(xiàn)有線程在阻塞在取元素,會(huì)直接把這個(gè)元素給等待線程。如果沒有人等著消費(fèi),那么會(huì)把這個(gè)元素放到隊(duì)列尾部,并且此方法阻塞直到有人讀取這個(gè)元素。和 SynchronousQueue 有點(diǎn)像,但比它更強(qiáng)大。
示例
import java.util.concurrent.*;
public class LinkedTransferQueueExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) LinkedTransferQueue
      LinkedTransferQueue<String> queue = new LinkedTransferQueue<String>();
      // 啟動(dòng)一個(gè)新線程來從 LinkedTransferQueue 取出元素
      new Thread(() -> {
          try {
              System.out.println("Taken: " + queue.take());
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }).start();
      // 向 LinkedTransferQueue 添加一個(gè)元素
      try {
          queue.transfer("Element");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }
}14.DelayQueue 延時(shí)隊(duì)列
DelayQueue 是 Java 并發(fā)包的一部分,它是一個(gè)無界阻塞隊(duì)列,只有在延遲期滿時(shí)才能從中提取元素。此隊(duì)列的頭部是延遲期滿后保存時(shí)間最長(zhǎng)的元素。如果延遲都還沒有期滿,則隊(duì)列沒有頭部,并且 poll 將返回 null。
元素在 DelayQueue 中的順序是按照其到期時(shí)間的先后順序進(jìn)行排序的,越早到期的元素越排在隊(duì)列前面。延遲隊(duì)列常用于實(shí)現(xiàn)定時(shí)任務(wù)功能。
可以使放入隊(duì)列的元素在指定的延時(shí)后才被消費(fèi)者取出,元素需要實(shí)現(xiàn) Delayed 接口。
示例
import java.util.concurrent.*;
public class DelayQueueExample {
  public static void main(String[] args) {
      // 創(chuàng)建一個(gè) DelayQueue
      DelayQueue<DelayedElement> queue = new DelayQueue<DelayedElement>();
      // 向 DelayQueue 添加一個(gè)元素,延遲 3 秒
      queue.put(new DelayedElement(3000, "Element"));
      // 從 DelayQueue 獲取元素
      try {
          DelayedElement element = queue.take();
          System.out.println("Taken: " + element);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }
}
class DelayedElement implements Delayed {
  private long delayTime; // 延遲時(shí)間
  private long expire; // 到期時(shí)間
  private String element; // 元素?cái)?shù)據(jù)
  public DelayedElement(long delay, String element) {
      this.delayTime = delay;
      this.element = element;
      this.expire = System.currentTimeMillis() + delay;
  }
  @Override
  public long getDelay(TimeUnit unit) {
      return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  }
  @Override
  public int compareTo(Delayed o) {
      return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
  }
  @Override
  public String toString() {
      return element;
  }
}總結(jié)
從上面的介紹總總結(jié)有以下幾種容器類
- ConcurrentHashMap:并發(fā)版 HashMap
 - CopyOnWriteArrayList:并發(fā)版 ArrayList
 - CopyOnWriteArraySet:并發(fā) Set
 - ConcurrentLinkedQueue:并發(fā)隊(duì)列 (基于鏈表)
 - ConcurrentLinkedDeque:并發(fā)隊(duì)列 (基于雙向鏈表)
 - ConcurrentSkipListMap:基于跳表的并發(fā) Map
 - ConcurrentSkipListSet:基于跳表的并發(fā) Set
 - ArrayBlockingQueue:阻塞隊(duì)列 (基于數(shù)組)
 - LinkedBlockingQueue:阻塞隊(duì)列 (基于鏈表)
 - LinkedBlockingDeque:阻塞隊(duì)列 (基于雙向鏈表)
 - PriorityBlockingQueue:線程安全的優(yōu)先隊(duì)列
 - SynchronousQueue:讀寫成對(duì)的隊(duì)列
 - LinkedTransferQueue:基于鏈表的數(shù)據(jù)交換隊(duì)列
 - DelayQueue:延時(shí)隊(duì)列
 
Java 并發(fā)容器為處理多線程環(huán)境下的數(shù)據(jù)訪問和修改提供了強(qiáng)大的工具。
通過了解和學(xué)習(xí)這些并發(fā)容器,我們可以更好地理解并發(fā)編程,更有效地處理并發(fā)問題。
無論你是正在學(xué)習(xí) Java,還是已經(jīng)在使用 Java 進(jìn)行開發(fā),我都強(qiáng)烈建議你深入了解這些并發(fā)容器,它們將在你的并發(fā)編程之路上起到重要的作用。















 
 
 








 
 
 
 