5、ConcurrentHashMap非阻塞Hash集合?
ConcurrentHashMap是Java并發包中提供的一個線程安全且高效的HashMap實現,ConcurrentHashMap在并發編程的場景中使用頻率非常之高,本文就來分析下ConcurrentHashMap的實現原理,并對其實現原理進行分析。
● ConcurrentLinkedQuere 類圖
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖。
ReentrantLock在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數據。一個ConcurrentHashMap里包含一個Segment數組,Segment的結構和HashMap類似,是一種數組和鏈表結構,一個Segment里包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每個Segment守護者一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得它對應的Segment鎖。
● ConcurrentLinkedQuere實現原理
眾所周知,哈希表是非常高效的,復雜度為O(1)的數據結構,在Java開發中,我們最常見到最頻繁使用的就是HashMap和HashTable,但是在線程競爭激烈的并發場景中使用都不夠合理。
● HashMap:先說HashMap,HashMap是線程不安全的,在并發環境下,可能會形成環狀鏈表(擴容時可能造成,具體原因自行百度google或查看源碼分析),導致get操作時,cpu 空轉,所以,在并發環境中使用HashMap是非常危險的。
● HashTable:HashTable和HashMap的實現原理幾乎一樣,差別無非是:
1、HashTable不允許key和value為null;
2、HashTable是線程安全的。但是HashTable線程安全的策略實現代價卻太大了,簡單粗暴,get/put所有相關操作都是synchronized的,這相當于給整個哈希表加了一把大鎖,多線程訪問時候,只要有一個線程訪問或操作該對象,那其他線程只能阻塞,相當于將所有的操作串行化,在競爭激烈的并發場景中性能就會非常差。如下圖:
HashTable性能差主要是由于所有操作需要競爭同一把鎖,而如果容器中有多把鎖,每一把鎖鎖一段數據,這樣在多線程訪問時不同段的數據時,就不會存在鎖競爭了,這樣便可以有效地提高并發效率。這就是ConcurrentHashMap所采用的“分段鎖”思想。
● ConcurrentLinkedQuere源碼解析
ConcurrentHashMap采用了非常精妙的"分段鎖"策略,ConcurrentHashMap的主干是個Segment數組。
final Segment<K,V>[] segments;
Segment繼承了ReentrantLock,所以它就是一種可重入鎖(ReentrantLock)。在ConcurrentHashMap中,一個Segment就是一個子哈希表,Segment里維護了一個HashEntry數組,并發環境下,對于不同Segment的數據進行的操作是不用考慮鎖競爭的。(就按默認的ConcurrentLeve為16來講,理論上就允許16個線程并發執行)所以,對于同一個Segment的操作才需考慮線程同步,不同的Segment則無需考慮。Segment類似于HashMap,一個Segment維護著一個HashEntry數組。
transient volatile HashEntry<K,V>[] table;
HashEntry是目前我們提到的最小的邏輯處理單元了。一個ConcurrentHashMap維護一個Segment數組,一個Segment維護一個HashEntry數組。
static final class HashEntry<K, V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K, V> next;
//其他省略
}
我們說Segment類似哈希表,那么一些屬性就跟我們之前提到的HashMap差不多,比如負載因子loadFactor,比如閾值threshold等等,看下Segment的構造方法。
Segment(float lf, int threshold, HashEntry<K, V>[] tab) {
this.loadFactor = lf;//負載因子
this.threshold = threshold;//閾值
this.table = tab;//主干數組即 HashEntry 數組
}
我們來看下ConcurrentHashMap的構造方法
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException();
//MAX_SEGMENTS 為 1<<16=65536,也就是最大并發數為 65536
if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS;
//2 的 sshif 次方等于 ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
int sshift = 0;
//ssize 為 segments 數組長度,根據 concurrentLevel 計算得出
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//segmentShift 和 segmentMask 這兩個變量在定位 segment 時會用到,后面會詳細講
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY;
//計算 cap 的大小,即 Segment 中 HashEntry 的數組長度,cap 也一定為 2 的 n 次方.
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c) cap <<= 1;
//創建 segments 數組并初始化第一個 Segment,其余的 Segment 延遲初始化
Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]);
Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
初始化方法有三個參數,如果用戶不指定則會使用默認值,initialCapacity為16,loadFactor為0.75(負載因子,擴容時需要參考),concurrentLevel為16。
從上面的代碼可以看出來,Segment數組的大小ssize是由concurrentLevel來決定的,但是卻不一定等于concurrentLevel,ssize一定是大于或等于concurrentLevel的最小的2的次冪。比如:默認情況下concurrentLevel是16,則ssize為16;若concurrentLevel為14,ssize為16;若concurrentLevel為17,則ssize為32。為什么Segment的數組大小一定是2的次冪?其實主要是便于通過按位與的散列算法來定位Segment的index。其實,put方法對segment也會有所體現。
public V put(K key, V value) {
Segment<K, V> s;
//concurrentHashMap 不允許 key/value 為空
if (value == null)
throw new NullPointerException();
//hash 函數對 key 的 hashCode 重新散列,避免差勁的不合理的 hashcode,保證散列均勻
int hash = hash(key);
//返回的 hash 值無符號右移 segmentShift 位與段掩碼進行位運算,定位 segment
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K, V>) UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
● 從源碼看出,put的主要邏輯也就兩步:
定位segment并確保定位的Segment已初始化。
調用Segment的put方法。
Ps:關于segmentShift和segmentMask
segmentShift和segmentMask這兩個全局變量的主要作用是用來定位Segment。
int j = (hash >>> segmentShift) & segmentMask;
segmentMask:段掩碼,假如segments數組長度為16,則段掩碼為16-1=15;segments長度為32,段掩碼為32-1=31。這樣得到的所有bit位都為1,可以更好地保證散列的均勻性。
segmentShift:2的sshift次方等于ssize,segmentShift=32-sshift。若segments長度為16,segmentShift=32-4=28;若segments長度為32,segmentShift=32-5=27。而計算得出的hash值最大為32位,無符號右移segmentShift,則意味著只保留高幾位(其余位是沒用的),然后與段掩碼segmentMask位運算來定位Segment。
ConcurrentLinkedQuere 方法。
● Get 操作:
public V get(Object key) {
Segment<K, V> s;
HashEntry<K, V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//先定位 Segment,再定位 HashEntry
if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value;
}
}
return null;
}
get方法無需加鎖,由于其中涉及到的共享變量都使用volatile修飾,volatile可以保證內存可見性,所以不會讀取到過期數據。
來看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加鎖的。只不過是鎖粒度細了而已。
● Put 操作:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K, V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
//tryLock 不成功時會遍歷定位到的 HashEnry 位置的鏈表(遍歷主要是為了使 CPU 緩存鏈表),
// 若找不到,則創建 HashEntry。tryLock 一定次數后(MAX_SCAN_RETRIES 變量決定),則lock。
// 若遍歷過程中,由于其他線程的操作導致鏈表頭結點變化,則需要重新遍歷。
V oldValue;
try {
HashEntry<K, V>[] tab = table;
int index = (tab.length - 1) & hash;//定位 HashEntry,可以看到,這個 hash 值在定位 Segment
//時和在 Segment 中定位 HashEntry 都會用到,只不過定位 Segment 時只用到高幾位。
HashEntry<K, V> first = entryAt(tab, index);
for (HashEntry<K, V> e = first; ; ) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
} else {
if (node != null) node.setNext(first);
else
node = new HashEntry<K, V>(hash, key, value, first);
int c = count + 1;
//若 c 超出閾值 threshold,需要擴容并 rehash。擴容后的容量是當前容量的 2 倍。這樣可以最大程
//度避免之前散列好的 entry 重新散列,具體在另一篇文章中有詳細分析,不贅述。擴容并 rehash 的這個過程是比較消耗資源的。
if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
ConcurrentLinkedQuere 總結
ConcurrentHashMap作為一種線程安全且高效的哈希表的解決方案,尤其是其中的“分段鎖”的方案,相比HashTable的全表鎖在性能上的提升非常之大。本文對ConcurrentHashMap的實現原理進行了詳細分析,并解讀了部分源碼,希望能幫助到有需要的童鞋。