首頁/ 汽車/ 正文

java併發執行緒深入理解CAS以及ABA問題的處理

一、什麼是CAS

CAS(Compare And Swap,比較並交換),通常指的是這樣一種原子操作:針對一個變數,首先比較它的記憶體值與某個期望值是否相同,如果相同,就給它賦一個新值。

CAS實現過程如下圖

1、一個初始值變數V,值為5;一開始先讀取V實際記憶體中的值賦值給E

2、比如我們需要給最原始的V+1操作,那麼此時用E+1來進行操作(這是防止V在其他執行緒已經被改變),這樣完成了U=E+1的操作

3、判斷E和V的值是否一致,如果一致則證明在以上操作過程中V沒有被其他執行緒改變則將U的值賦值給V,如果不一致那V就被其他改變了,這樣給U的+1操作就不成立,返回當前的V。

java併發執行緒深入理解CAS以及ABA問題的處理

CAS 的邏輯用虛擬碼描述如下:

if (value == expectedValue) { value = newValue;

以上虛擬碼描述了一個由比較和賦值兩階段組成的複合操作,CAS 可以看作是它們合併後的整體——一個不可分割的原子操作,並且其原子性是直接在硬體層面得到保障的。

CAS可以看做是樂觀鎖(對比資料庫的悲觀、樂觀鎖)的一種實現方式,Java原子類中的遞增操作就透過CAS自旋實現的。

CAS是一種無鎖演算法,在不使用鎖(沒有執行緒被阻塞)的情況下實現多執行緒之間的變數同步。

1-2、CAS應用

在java併發執行緒中,我們可以使用以下方式進行加鎖處理:\

1、synchronize

在併發競爭比較激烈的時候不推薦使用,會讓未獲得鎖的執行緒阻塞,因為會切換到核心態,進行park,這樣就會有效能問題

2、使用ReentrantLock

lock。lock(加鎖) lock。unlock(解鎖),需要注意的是unlock需要放到finally中,防止程式碼塊異常,丟擲後鎖一直存在。

3、CAS也可以實現執行緒鎖機制。

在 Java 中,CAS 操作是由 Unsafe 類提供支援的,該類定義了三種針對不同型別變數的 CAS 操作,如圖

java併發執行緒深入理解CAS以及ABA問題的處理

它們都是 native 方法,由 Java 虛擬機器提供具體實現,這意味著不同的 Java 虛擬機器對它們的實現可能會略有不同。

以 compareAndSwapInt 為例,Unsafe 的 compareAndSwapInt 方法接收 4 個引數,分別是:物件例項、記憶體偏移量、欄位期望值、欄位新值。該方法會針對指定物件例項中的相應偏移量的欄位執行 CAS 操作。

1-3、併發鎖的實現

1-3-1、使用synchronized

我們都知道用synchronized可以實現鎖機制,但是在併發量大的時候,會**鎖競爭的現象,這樣就會涉及到使用者態到核心態的切換**,這是極其耗費效能的,因此建議在併發量不大的情況下,可以使用synchronized進行加鎖。併發量大的時候,不建議使用

public class ThreadLockSynchronized { private volatile static int count = 0; static Object object = “”; public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { synchronized (object) { for (int j = 0; j < 10000; j++) { count++; } } } }); thread。start(); } try { Thread。sleep(1000); } catch (InterruptedException e) { e。printStackTrace(); } System。out。println(“count:” + count); }}

以上程式碼測試結果如下:

java併發執行緒深入理解CAS以及ABA問題的處理

1-3-2、使用ReentrantLock

使用ReentrantLock的時候一定要注意,

要將unlock()放到finally程式碼塊中,防止業務程式碼異常,無法釋放鎖

public class ThreadLockReentrantLock { private volatile static int count = 0; static ReentrantLock reentrantLock=new ReentrantLock(); public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { //使用reentrantLock。lock();進行加鎖操作 reentrantLock。lock(); try { for (int j = 0; j < 10000; j++) { count++; } } finally { //reentrantLock。unlock();一定要放到finally中,防止業務程式碼異常,導致鎖不釋放 reentrantLock。unlock(); } } }); thread。start(); } try { Thread。sleep(1000); } catch (InterruptedException e) { e。printStackTrace(); } System。out。println(“count:” + count); }}

1-3-3、使用CAS

1-3-3-1、透過反射獲得Unsafe

Unsafe是jdk提供的工具類,我們要使用需要透過反射機制取到,同時獲得其獲取偏移量的操作

public class UnsafeFactory { /** * 獲取 Unsafe 物件 * @return */ public static Unsafe getUnsafe() { try { Field field = Unsafe。class。getDeclaredField(“theUnsafe”); field。setAccessible(true); return (Unsafe) field。get(null); } catch (Exception e) { e。printStackTrace(); } return null; } /** * 獲取欄位的記憶體偏移量 * @param unsafe * @param clazz * @param fieldName * @return */ public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) { try { return unsafe。objectFieldOffset(clazz。getDeclaredField(fieldName)); } catch (NoSuchFieldException e) { throw new Error(e); } }}

1-3-3-2、透過CAS實現對變數的修改

首先定義一個物件Entity,其中定義一個int型別變數X,然後透過CAS對X進行修改。

public class CASTest { public static void main(String[] args) { Entity entity = new Entity(); //透過反射機制獲得Unsafe Unsafe unsafe = UnsafeFactory。getUnsafe(); //獲得x記憶體中的偏移量 long offset = UnsafeFactory。getFieldOffset(unsafe, Entity。class, “x”); System。out。println(offset); boolean successful; // 4個引數分別是:物件例項、欄位的記憶體偏移量、欄位原值、欄位更新值 //透過CAS將x由0改為3 successful = unsafe。compareAndSwapInt(entity, offset, 0, 3); System。out。println(successful + “\t” + entity。x); //透過CAS將x由3改為5 successful = unsafe。compareAndSwapInt(entity, offset, 3, 5); System。out。println(successful + “\t” + entity。x); //透過CAS將x由3改為8——本條是不成立的,上面已經將x改為5 successful = unsafe。compareAndSwapInt(entity, offset, 3, 8); System。out。println(successful + “\t” + entity。x); }}class Entity{ int x;}

執行結果如下:

首先獲得X的偏移量為12,其次就是對x進行修改的結果列印。第三次修改失敗,因為第二次已經將x改為5,如果在用第一次的3作為原值去修改,就會修改失敗。

java併發執行緒深入理解CAS以及ABA問題的處理

1-3-3-3、CAS特點

根據以上執行結果,就證明了CAS的特點:

先比較、後更新

,這兩步,底層會幫助我們實現

原子操作,有序性和可見性

避免上線文切換。

1-4、CAS原始碼分析

1-4-1、java層程式碼

Unsafe類中提供了三種CAS操作如下,這三種都是native方法,都是Hotspot程式碼,

java併發執行緒深入理解CAS以及ABA問題的處理

下面用compareAndSwapInt方法來舉例說明

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

方法引數:

1、物件例項

2、欄位記憶體值的偏移量(根據物件例項和偏移量就可以獲得對應的變數)

3、期望物件例項中的原值

4、欄位更新值

1-4-2、Hotspot層

1-4-2-1、Unsafe_CompareAndSwapInt方法

呼叫Hotspot方法原始碼如下:

#unsafe。cppUNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))UnsafeWrapper(“Unsafe_CompareAndSwapInt”);oop p = JNIHandles::resolve(obj);// 根據偏移量,計算value的地址jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);// Atomic::cmpxchg(x, addr, e) cas邏輯 x:要交換的值 e:要比較的值//cas成功,返回期望值e,等於e,此方法返回true //cas失敗,返回記憶體中的value值,不等於e,此方法返回falsereturn (jint)(Atomic::cmpxchg(x, addr, e)) == e;

1-4-2-1-1、方法解析

首先呼叫的方法

Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)

前面兩個值是hotspot傳入的值,後面四個值為java方法傳入進來的。

其次根據偏移量,計算value的地址

//獲得物件oop p = JNIHandles::resolve(obj); // 根據偏移量,計算value的地址 jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);

然後呼叫Atomic::cmpxchg實現CAS操作

// Atomic::cmpxchg(x, addr, e) cas邏輯 x:要交換的值 e:要比較的值//cas成功,返回期望值e,等於e,此方法返回true //cas失敗,返回記憶體中的value值,不等於e,此方法返回falsereturn (jint)(Atomic::cmpxchg(x, addr, e)) == e;

1-4-2-2、Atomic::cmpxchg方法

需要注意下面的程式碼是Linux_x86,不同系統處理CAS是不同的

#atomic_linux_x86。inline。hppinline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {//判斷當前執行環境是否為多處理器環境int mp = os::is_MP();//LOCK_IF_MP(%4) 在多處理器環境下,為 cmpxchgl 指令新增 lock 字首,以達到記憶體屏障的效果//cmpxchgl 指令是包含在 x86 架構及 IA-64 架構中的一個原子條件指令,//它會首先比較 dest 指標指向的記憶體值是否和 compare_value 的值相等,//如果相等,則雙向交換 dest 與 exchange_value,否則就單方面地將 dest 指向的記憶體值交給exchange_value。//這條指令完成了整個 CAS 操作,因此它也被稱為 CAS 指令。__asm__ volatile (LOCK_IF_MP(%4) “cmpxchgl %1,(%3)”: “=a” (exchange_value): “r” (exchange_value), “a” (compare_value), “r” (dest), “r” (mp): “cc”, “memory”);return exchange_value;

cmpxchgl的詳細執行過程:

首先,輸入是“r” (exchange_value), “a” (compare_value), “r” (dest), “r” (mp),表示compare_value存入eax暫存器,而exchange_value、dest、mp的值存入任意的通用暫存器。嵌入式彙編規定把輸出和輸入暫存器按統一順序編號,順序是從輸出暫存器序列從左到右從上到下以“%0”開始,分別記為%0、%1···%9。也就是說,輸出的eax是%0,輸入的exchange_value、compare_value、dest、mp分別是%1、%2、%3、%4。

因此,cmpxchg %1,(%3)實際上表示cmpxchg exchange_value,(dest)

需要注意的是cmpxchg有個隱含運算元eax,其實際過程是先比較eax的值(也就是compare_value)和dest地址所存的值是否相等,

輸出是“=a” (exchange_value),表示把eax中存的值寫入exchange_value變數中。

Atomic::cmpxchg這個函式最終返回值是exchange_value,也就是說,如果cmpxchgl執行時compare_value和dest指標指向記憶體值相等則會使得dest指標指向記憶體值變成exchange_value,最終eax存的compare_value賦值給了exchange_value變數,即函式最終返回的值是原先的compare_value。此時Unsafe_CompareAndSwapInt的返回值(jint)(Atomic::cmpxchg(x, addr, e)) == e就是true,表明CAS成功。如果cmpxchgl執行時compare_value和(dest)不等則會把當前dest指標指向記憶體的值寫入eax,最終輸出時賦值給exchange_value變數作為返回值,導致(jint)(Atomic::cmpxchg(x, addr, e)) == e得到false,表明CAS失敗。

現代處理器指令集架構基本上都會提供 CAS 指令,例如 x86 和 IA-64 架構中的 cmpxchgl 指令和 comxchgq 指令,sparc 架構中的 cas 指令和 casx 指令。

不管是 Hotspot 中的 Atomic::cmpxchg 方法,還是 Java 中的 compareAndSwapInt 方法,它們本質上都是對相應平臺的 CAS 指令的一層簡單封裝。CAS 指令作為一種硬體原語,有著天然的原子性,這也正是 CAS 的價值所在。

1-5、透過CAS實現鎖

我們再回到一開始的程式碼中,該如何實現鎖呢

public class ThreadLockCAS { private volatile static int count = 0; public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < 10000; j++) { count++; } } }); thread。start(); } try { Thread。sleep(1000); } catch (InterruptedException e) { e。printStackTrace(); } System。out。println(“count:” + count); }}

1-5-1、實現思路

透過上對CAS的理解,我們就可以這麼操作:

1、設定一箇中間值為0

2、第一個執行緒進來的時候透過CAS將中間值改為1

3、其他執行緒進來 再次準備將0改為1的時候就會失敗

4、第一個執行緒業務操作完畢,再次透過cas將中間值改為0

5、下一個執行緒再次透過CAS將0改為1就會成功,獲得鎖

1-5-2、實現一個CAS比較與交換的幫助類

1-5-2-1、實現加鎖的類

其中UnsafeFactory類的程式碼在上面已經貼過,這個地方就不再貼了。這個類中主要就是cas()這個方法。這就是CAS方式加鎖的操作。

state就是作為改變交換的值。

public class CASLock { private volatile int state; private static final Unsafe UNSAFE; private static final long OFFSET; static { try { UNSAFE= UnsafeFactory。getUnsafe(); OFFSET=UnsafeFactory。getFieldOffset(UNSAFE,CASLock。class,“state”); } catch (Exception e) { throw new Error(e); } } //設定CAS操作 public boolean cas(){ return UNSAFE。compareAndSwapInt(this,OFFSET,0,1); } public int getState() { return state; } public void setState(int state) { this。state = state; }}

1-5-2-2、使用鎖實現

如下程式碼,在第一個for迴圈中建立執行緒,然後執行緒中實現了一個自旋操作,這樣第一個執行緒進入的鎖之後,其他執行緒都在進行自旋操作,等第一個執行緒透過casLock。setState(0),釋放鎖的時候,下一個執行緒casLock。getState()==0 && casLock。cas()就可以成立。

public class ThreadLockCAS { private volatile static int count = 0; static CASLock casLock = new CASLock(); public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { for (; ; ) { //state=0 if (casLock。getState() == 0 && casLock。cas()) { try { for (int j = 0; j < 10000; j++) { count++; } } finally { casLock。setState(0); } break; } } }); thread。start(); } try { Thread。sleep(3000); } catch (InterruptedException e) { e。printStackTrace(); } System。out。println(count); }}

雖然透過以上程式碼實現了加鎖操作,由於使用了自旋操作,這樣等待的執行緒就會一直空轉,消耗CPU資源,顯然這樣得實現方式是不太友好的

1-6、CAS缺陷

CAS 雖然高效地解決了原子操作,但是還是存在一些缺陷的,主要表現在三個方面:

自旋 CAS 長時間地不成功,則會給 CPU 帶來非常大的開銷

只能保證一個共享變數原子操作

ABA 問題

1-6-1、ABA問題及解決方案

CAS演算法實現一個重要前提需要取出記憶體中某時刻的資料,而在下時刻比較並替換,那麼在這個時間差類會導致資料的變化。

1-6-1-1、什麼是ABA問題

當有多個執行緒對一個原子類進行操作的時候,某個執行緒在短時間內將原子類的值A修改為B,又馬上將其修改為A,此時其他執行緒不感知,還是會修改成功。

java併發執行緒深入理解CAS以及ABA問題的處理

測試ABA問題

程式碼執行過程:

第一個執行緒要從1改為3,進入執行緒之後等待1秒。

第二個執行緒將1改為2,然後又將2改為1

第一個執行緒從1改為3,(因為原值還是1,這樣就修改成功了)-

實際上此1非彼1

public class ABATest { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(1); new Thread(() -> { int value = atomicInteger。get(); System。out。println(“Thread1 read value: ” + value); // 阻塞1s LockSupport。parkNanos(1000000000L); // Thread1透過CAS修改value值為3 if (atomicInteger。compareAndSet(value, 3)) { System。out。println(“Thread1 update from ” + value + “ to 3”); } else { System。out。println(“Thread1 update fail!”); } }, “Thread1”)。start(); new Thread(() -> { int value = atomicInteger。get(); System。out。println(“Thread2 read value: ” + value); // Thread2透過CAS修改value值為2 if (atomicInteger。compareAndSet(value, 2)) { System。out。println(“Thread2 update from ” + value + “ to 2”); // do something value = atomicInteger。get(); System。out。println(“Thread2 read value: ” + value); // Thread2透過CAS修改value值為1 if (atomicInteger。compareAndSet(value, 1)) { System。out。println(“Thread2 update from ” + value + “ to 1”); } } }, “Thread2”)。start(); }}

執行結果如下:

Thread1不清楚Thread2對value的操作,誤以為value=1沒有修改過

java併發執行緒深入理解CAS以及ABA問題的處理

1-6-1-2、ABA問題的解決方案

資料庫有個鎖稱為樂觀鎖,是一種基於資料版本實現資料同步的機制,每次修改一次資料,版本就會進行累加。

AtomicStampedReference解決CAS的ABA問題

同樣,Java也提供了相應的原子引用類AtomicStampedReference

java併發執行緒深入理解CAS以及ABA問題的處理

reference即我們實際儲存的變數,stamp是版本,每次修改可以透過+1保證版本唯一性。這樣就可以保證每次修改後的版本也會往上遞增。

AtomicMarkableReference解決CAS的ABA問題

可以理解為上面AtomicStampedReference的簡化版,就是不關心修改過幾次,僅僅關心是否修改過。因此變數mark是boolean型別,僅記錄值是否有過修改。

java併發執行緒深入理解CAS以及ABA問題的處理

使用AtomicStampedReference,利用版本號解決ABA問題

public class AtomicStampedReferenceTest { public static void main(String[] args) { // 定義AtomicStampedReference Pair。reference值為1, Pair。stamp為1 AtomicStampedReference atomicStampedReference = new AtomicStampedReference(1,1); new Thread(()->{ int[] stampHolder = new int[1]; int value = (int) atomicStampedReference。get(stampHolder); int stamp = stampHolder[0]; System。out。println(“Thread1 read value: ” + value + “, stamp: ” + stamp); // 阻塞1s LockSupport。parkNanos(1000000000L); // Thread1透過CAS修改value值為3 stamp是版本,每次修改可以透過+1保證版本唯一性 if (atomicStampedReference。compareAndSet(value, 3,stamp,stamp+1)) { System。out。println(“Thread1 update from ” + value + “ to 3”); } else { System。out。println(“Thread1 update fail!”); } },“Thread1”)。start(); new Thread(()->{ int[] stampHolder = new int[1]; int value = (int)atomicStampedReference。get(stampHolder); int stamp = stampHolder[0]; System。out。println(“Thread2 read value: ” + value+ “, stamp: ” + stamp); // Thread2透過CAS修改value值為2 if (atomicStampedReference。compareAndSet(value, 2,stamp,stamp+1)) { System。out。println(“Thread2 update from ” + value + “ to 2”); // do something value = (int) atomicStampedReference。get(stampHolder); stamp = stampHolder[0]; System。out。println(“Thread2 read value: ” + value+ “, stamp: ” + stamp); // Thread2透過CAS修改value值為1 if (atomicStampedReference。compareAndSet(value, 1,stamp,stamp+1)) { System。out。println(“Thread2 update from ” + value + “ to 1”); } } },“Thread2”)。start(); }}

執行結果

java併發執行緒深入理解CAS以及ABA問題的處理

總結:

本篇文章主要講述CAS的加鎖的實現方式,以及透過使用AtomicStampedRefrence和AtomicMarkableReference解決ABA的問題

作者:Jony_zhang

連結:https://juejin。cn/post/7150628427761418247

相關文章

頂部