Appearance
JUC CAS与原子类
本文介绍CAS思想以及基于CAS的原子类。
1. CAS
CAS,全称Compare-And-Set(又可以称为Compare-and-Swap),是一种在并发编程中广泛使用的原子操作。CAS 操作包含三个操作数:一个内存位置(V)、预期的旧值(A)和更新值(B)。工作原理为:处理器会原子性地比较内存位置 V 中的当前值是否与预期旧值 A 相匹配。
- 如果匹配,则将该内存位置的值更新为新值 B。
- 如果不匹配(意味着其他线程已经修改了该值),则不执行任何操作。
CAS 是一种乐观锁的实现方式,常用于实现非阻塞算法,可以避免使用传统锁带来的开销,提高多线程程序的性能。
悲观锁:对数据被并发修改持“悲观”态度,它认为在整个数据处理过程中,其他线程“一定”会修改数据。因此,在操作数据之前,悲观锁会先获取锁,将数据锁定,使得其他试图访问或修改该数据的线程必须等待,直到锁被释放。
乐观锁:对数据被并发修改持”乐观“态度,认为在数据被处理期间,其他线程对数据的修改概率较低。因此,它在读取数据时并不会立即加锁,而是允许其他线程同时访问和修改数据。只有在提交更新时,乐观锁才会去检查在此期间数据是否被其他线程修改过。如果数据被修改了,则当前操作会失败(通常会回滚或重试)。
下面以一个例子演示CAS的使用,现有一个整数,有10个线程并发对其进行加一操作,每个线程循环1000次,则最后的值应该为10000。
java
private static AtomicInteger num = new AtomicInteger(0);
public static void main(String[] args) {
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
num.getAndIncrement();
}
});
threadList.add(thread);
thread.start();
}
// 主线程等待其他线程运行完毕
threadList.forEach(x->{
try {
x.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
log.info("{}", num);
}在上面的例子中,我们使用了AtomicInteger类,这是JUC提供给我们的工具类,其利用了CAS的思想,底层使用Unsafe类。
2. Unsafe类
Unsafe类是底层的类,是Java 的非标准 API,它的使用非常危险,可能导致 JVM 崩溃、内存损坏、安全漏洞等问题。不建议在普通的业务代码中使用它。
Unsafe类不能通过简单的方式获取,我们需要通过反射来获取:
java
private static final Unsafe UNSAFE;
static {
// 通过反射获取 Unsafe 的实例
Field theUnsafe = null;
try {
theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException|IllegalAccessException e) {
throw new RuntimeException(e);
}
}在Unsafe类中提供了CAS方法:
java
public final boolean compareAndSwapInt(Object o, long offset,
int expected,
int x)
public final boolean compareAndSwapLong(Object o, long offset,
long expected,
long x)
public final boolean compareAndSwapObject(Object o, long offset,
Object expected,
Object x)这些方法的参数含义都相同,以compareAndSwapInt为例:
Object o和long offset:共同决定内存位置int expected:预期旧值int x:更新值
现在,我们以Unsafe来实现多线程并发对一个数执行加一操作:
java
@Slf4j
public class UnsafeClassObjectAddDemo {
private static final Unsafe UNSAFE;
static {
// 通过反射获取 Unsafe 的实例
Field theUnsafe = null;
try {
theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException|IllegalAccessException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws NoSuchFieldException {
Field numField = Counter.class.getDeclaredField("num");
// 获取字段的偏移量
long offset = UNSAFE.objectFieldOffset(numField);
Counter counter = new Counter();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
while (true){
int prev = counter.getNum();
int newNum = counter.getNum() + 1;
// CAS操作
if (UNSAFE.compareAndSwapInt(counter, offset, prev, newNum)) {
break;
}
}
}
});
threadList.add(thread);
thread.start();
}
threadList.forEach(x-> {
try {
x.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
log.info("{}", counter.getNum());
}
}
@Data
class Counter{
private volatile int num = 0;
}如果字段是静态变量,则需要使用如下的方式:
- 使用
unsafe.staticFieldBase(Field f)获取基对象。 - 使用
unsafe.staticFieldOffset(Field f)获取偏移量。 - 操作方法如
unsafe.compareAndSwapInt(Object base, long offset, int expected, int x),其中第一个参数是staticFieldBase()返回的基对象,第二个参数是staticFieldOffset()返回的偏移量。
例如:
Details
java
@Slf4j
public class UnsafeClassAddDemo {
private static volatile int num = 0;
private static final Unsafe UNSAFE;
static {
// 通过反射获取 Unsafe 的实例
Field theUnsafe = null;
try {
theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException|IllegalAccessException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws NoSuchFieldException {
Field numField = UnsafeClassAddDemo.class.getDeclaredField("num");
Object o = UNSAFE.staticFieldBase(numField);
long offset = UNSAFE.staticFieldOffset(numField);
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
while (true){
int prev = num;
int newNum = num + 1;
if (UNSAFE.compareAndSwapInt(o, offset, prev, newNum)) {
break;
}
}
}
});
threadList.add(thread);
thread.start();
}
threadList.forEach(x-> {
try {
x.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
log.info("{}", num);
}
}上面的例子中,我们都在num字段上加了volatile,其实,不加好像也是可以的,是因为:compareAndSwapInt(Object o, long offset, int expected, int x): 这个方法在成功时,同时提供了读屏障和写屏障的语义。它既保证了在执行 CAS 之前,当前线程的所有写入对其他线程可见(Release),也保证了在 CAS 成功后,当前线程能看到其他线程在该变量之前的所有写入(Acquire)。失败时,只提供读屏障语义。
但是,在AtomicInteger中是加了volatile的,这是因为:
- 清晰地表达意图:即使技术上
Unsafe的方法已提供保障,将一个会被多个线程并发修改的字段标记为volatile,能更清晰地向其他开发者表明这个变量是共享的、需要特殊处理的。这是一种代码层面的文档。 - 并非所有操作都使用
Unsafe原子方法:虽然可以在 CAS 循环中只使用Unsafe的原子方法,但如果其他部分的代码需要直接访问这个字段,volatile关键字仍然可以提供标准的可见性保证。
3. 原子类
JUC 提供的原子类利用CAS思想,保证单个方法的原子性(所谓单个方法的原子性,是指当调用这些方法时,操作会一次性完成,不会被其他线程的操作打断。)。
3.1 AtomicInteger
我们以AtomicInteger为例,讲解基本数据类型的原子类使用方式。
构造方法:
AtomicInteger():创建一个初始值为0的原子整数;AtomicInteger(int initialValue):创建以指定值为初始值的原子整数;
常用原子方法:
get(): 获取当前值。set(int newValue): 设置新值。getAndSet(int newValue): 原子地设置新值,并返回旧值。compareAndSet(int expect, int update): 如果当前值等于expect,则原子地将值设置为update。成功返回true,否则返回false。这是许多其他原子操作的基础。incrementAndGet(): 原子地将当前值加 1,并返回新值,类似于++i。getAndIncrement(): 原子地将当前值加 1,并返回旧值,类似于i++。decrementAndGet(): 原子地将当前值减 1,并返回新值,类似于--i。getAndDecrement(): 原子地将当前值减 1,并返回旧值,类似于i--。addAndGet(int delta): 原子地将delta加到当前值,并返回新值。getAndAdd(int delta): 原子地将delta加到当前值,并返回旧值。
除了对值进行加减操作,也可以进行任意运算:
updateAndGet(IntUnaryOperator updateFunction):原子地对值进行运算,并返回新值。其中
IntUnaryOperator是一个函数式接口,接收一个值返回一个值。getAndUpdate(IntUnaryOperator updateFunction):原子地对值进行运算,并返回旧值。
下面以例子展示updateAndGet的用法:
java
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(6);
// 对值进行乘法运算
int r = atomicInteger.updateAndGet(x -> x * 10);
System.out.println("return : " + r);
System.out.println("atomic integer : " + atomicInteger.get());
}结果:
txt
return : 60
atomic integer : 603.2 AtomicReference
AtomicReference<V> 是 java.util.concurrent.atomic 包中提供的一个原子类,用于对对象引用 (Object Reference) 进行原子操作。这里的 <V> 是一个泛型,表示它可以持有任何类型的对象引用。
常用的原子操作方法:
get(): 原子地获取当前持有的对象引用。set(V newValue): 原子地将持有的引用设置为newValue。getAndSet(V newValue): 原子地将持有的引用设置为newValue,并返回更新前的旧引用。compareAndSet(V expect, V update)- 检查当前持有的引用是否等于
expect对象(使用==进行引用相等性比较)。 - 如果等于
expect,则原子地将持有的引用更新为update对象。 - 返回一个布尔值,表示是否成功进行了更新 (
true表示成功,false表示失败)。
- 检查当前持有的引用是否等于
注意事项:
AtomicReference保证的是引用本身的操作是原子性的,也就是说,改变它指向哪个对象的这个动作是原子的。AtomicReference的compareAndSet是基于引用相等性 (==) 进行比较的,而不是对象内容的相等性 (equals())。
注意事项案例如下:
java
@Slf4j
public class AtomicReferenceDemo {
public static void main(String[] args) {
AtomicReference<Counter> atomicReference = new AtomicReference<>(new Counter(1));
new Thread(()->{
Counter prev = atomicReference.get();
Sleeper.sleep(2000);
boolean b = atomicReference.compareAndSet(prev, new Counter(10));
log.info("更新结果:{}, counter: {}", b, atomicReference.get().getNum());
},"t1").start();
Sleeper.sleep(1000);
log.info("将counter值改为2");
atomicReference.get().setNum(2);
log.info("{}", atomicReference.get().getNum());
}
}
@Data
@AllArgsConstructor
class Counter{
private int num;
}结果:
txt
13:26:06.911 [main] INFO : 将counter值改为2
13:26:06.914 [main] INFO : 2
13:26:07.906 [t1] INFO : 更新结果:true, counter: 10可以发现,即使主线程将counter的值改为了2,t1线程使用CAS也可以修改成功,因为AtomicReference指向的对象引用地址并没有发生变化。
3.3 ABA问题和AtomicStampedReference
所谓ABA问题,就是一个线程读取了引用 A,然后另一个线程将引用从 A 改为 B,再改回 A,第一个线程再次执行 compareAndSet 时,会认为引用没有发生变化,从而成功执行操作。这在某些场景下可能导致问题。
例如:
java
public static void main(String[] args) {
AtomicReference<String> atomicReference = new AtomicReference<>("A");
new Thread(()->{
String prev = atomicReference.get();
Sleeper.sleep(2000);
boolean c = atomicReference.compareAndSet(prev, "C");
log.info("修改结果:{}, 值为:{}", c, atomicReference.get());
},"t1").start();
Sleeper.sleep(500);
boolean b = atomicReference.compareAndSet(atomicReference.get(), "B");
log.info("修改结果:{}, {}", b, atomicReference.get());
Sleeper.sleep(500);
boolean r = atomicReference.compareAndSet(atomicReference.get(), "A");
log.info("修改结果:{}, {}", r, atomicReference.get());
}结果如下:
java
13:40:50.827 [main] INFO : 修改结果:true, B
13:40:51.334 [main] INFO : 修改结果:true, A
13:40:52.324 [t1] INFO : 修改结果:true, 值为:C可以看到,t1线程最终修改成功,即预期值与引用最新值是相等的,即使中间有了修改,t1线程也是不知道的。
我们可以使用AtomicStampedReference来解决ABA问题,即引入版本号的概念,每次修改都增加一个版本号,不仅需要比较最新值,也要比较版本号。
java
public static void main(String[] args) {
// 构造方法传入初始版本号
AtomicStampedReference<String> atomicReference = new AtomicStampedReference<>("A", 0);
new Thread(()->{
String prev = atomicReference.getReference();
// 获取版本号
int stamp = atomicReference.getStamp();
Sleeper.sleep(2000);
// 不仅比较预期值,也要比较版本号
boolean c = atomicReference.compareAndSet(prev, "C", stamp, stamp + 1);
log.info("修改结果:{}, 值为:{}", c, atomicReference.getReference());
},"t1").start();
Sleeper.sleep(500);
boolean b = atomicReference.compareAndSet(atomicReference.getReference(), "B", atomicReference.getStamp(), atomicReference.getStamp() + 1);
log.info("修改结果:{}, {}", b, atomicReference.getReference());
Sleeper.sleep(500);
boolean r = atomicReference.compareAndSet(atomicReference.getReference(), "A", atomicReference.getStamp(), atomicReference.getStamp() + 1);
log.info("修改结果:{}, {}", r, atomicReference.getReference());
}3.4 AtomicIntegerArray
AtomicIntegerArray 是 java.util.concurrent.atomic 包提供的另一个原子类,它允许在一个 int 类型的数组中的每个元素上执行原子操作。
简单来说,如果有一个 int[] 数组,并且多个线程需要同时对数组中的某个或多个位置的元素进行读取、写入或修改(如自增、自减),并且需要确保这些操作是线程安全的,那么 AtomicIntegerArray 就是一个非常合适的选择。
它提供了与 AtomicInteger 类似的一系列原子方法,但这些方法都作用于数组中的指定索引位置。
构造方法:
可以指定数组的长度来创建一个新的 AtomicIntegerArray,所有元素默认初始化为 0:
java
// 创建一个长度为10的AtomicIntegerArray
AtomicIntegerArray atomicArray = new AtomicIntegerArray(10);也可以用一个现有的 int[] 数组来初始化,会复制数组的内容:
java
int[] initialArray = {1, 2, 3, 4, 5};
// 使用现有数组初始化
AtomicIntegerArray atomicArray = new AtomicIntegerArray(initialArray);常用方法:
int length():获取数组长度;int get(int i):获取数组指定索引处的值;void set(int i, int newValue):设置数组指定索引处的值;boolean compareAndSet(int i, int expectedValue, int newValue):原子地对数组指定索引处的值进行CAS操作;int getAndAdd(int i, int delta):原子地对数组指定索引处的值增加delta;int getAndUpdate(int i, IntUnaryOperator updateFunction):原子地对数组指定索引处的值进行运算;
3.5 字段更新器
字段更新器可以原子地操作指定对象的某个 volatile 字段。
字段更新器有三个:
AtomicReferenceFieldUpdater<T, V>:用于原子更新指定对象的volatile 引用类型字段。T: 是对象的类型。V: 是字段的类型(即引用的类型)。
AtomicIntegerFieldUpdater<T>:用于原子更新指定对象的volatile int字段。AtomicLongFieldUpdater<T>:用于原子更新指定对象的volatile long字段。
为什么需要字段更新器?
在某些情况下,可能有一个已存在的类,其中包含一个 volatile 字段,摒弃希望对其进行原子操作,但可能:
- 无法或不想修改类的定义: 例如,类可能来自第三方库,或者修改会影响大量现有代码。
- 出于内存考虑: 使用字段更新器通常比将字段类型从
V改为AtomicReference<V>更节省内存,因为不需要为每个对象创建一个额外的AtomicReference对象。 - API 设计: 有时 API 需要接受或返回普通对象,而不是原子封装类。
接下来以AtomicReferenceFieldUpdater为例讲解字段更新器的使用。
首先是创建字段更新器,字段更新器是抽象类,不能直接实例化。需要使用其静态工厂方法 newUpdater 来创建实例:
java
public static <U, W> AtomicReferenceFieldUpdater<U, W> newUpdater(
Class<U> tclass,
Class<W> vclass,
String fieldName);tclass: 包含字段的类的Class对象。vclass: 字段的类型的Class对象。fieldName: 字段的名称(字符串)。
要能使用字段更新器,目标字段必须满足以下条件:
- 必须是
volatile修饰的。 - 不能是
static修饰的(字段更新器用于实例字段)。 - 不能是
final修饰的(final字段不能改变)。 - 字段所在的类以及字段本身,对于创建和使用更新器的代码必须是可见的。如果是
private字段,那么创建和使用更新器的代码通常必须在同一个类中。
AtomicReferenceFieldUpdater的常用方法:
get(T obj): 原子地获取obj对象的字段值。set(T obj, V newValue): 原子地设置obj对象的字段值为newValue。getAndSet(T obj, V newValue): 原子地设置obj对象的字段值为newValue,并返回旧值。compareAndSet(T obj, V expect, V update): 原子地比较obj对象的字段值与expect,如果相等则更新为update。
下面以一个例子说明字段更新器的使用:
java
public class UpdaterDemo {
public static void main(String[] args) {
MyClass obj = new MyClass("Pending");
boolean updated = obj.compareAndSetStatus("Pending", "Processing");
System.out.println("Update successful: " + updated); // true
updated = obj.compareAndSetStatus("Pending", "Done");
System.out.println("Update successful: " + updated); // false
System.out.println("Current status: " + obj.getStatus()); // Processing
}
}
class MyClass {
volatile String status;
private static final AtomicReferenceFieldUpdater<MyClass, String> STATUS_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(MyClass.class, String.class, "status");
public MyClass(String initialStatus) {
this.status = initialStatus;
}
public boolean compareAndSetStatus(String expect, String update) {
return STATUS_UPDATER.compareAndSet(this, expect, update);
}
public String getStatus() {
return status;
}
}3.6 原子累加器
LongAdder 是 java.util.concurrent.atomic 包在 Java 8 中引入的一个类,它是为了解决在高并发环境下 AtomicLong 在更新操作时可能遇到的性能瓶颈而设计的。
AtomicLong 的在高并发下的问题:
AtomicLong 内部维护一个 long 值,所有对该值的更新操作(如 incrementAndGet()、addAndGet() 等)都是通过原子地 CAS (Compare-And-Swap) 操作来完成的。在低到中等程度的并发下,AtomicLong 表现良好。但是,如果在极高并发的情况下,大量线程同时尝试更新同一个 AtomicLong 变量,会导致大量的 CAS 操作失败。失败的线程会不断重试,这会极大地增加 CPU 的开销和总线上的竞争,形成严重的“热点”,降低整体系统的吞吐量。
LongAdder 的设计理念:
LongAdder 的设计思想是分而治之。它不像 AtomicLong 那样只有一个单一的计数器,而是在内部维护一个或多个变量(通常称为“单元 Cells”)来共同组成最终的计数。
- 基本值 (base): 在竞争不激烈时,更新操作会直接作用于一个基本值,类似于
AtomicLong。 - 单元数组 (cells): 当出现竞争时,
LongAdder会引入一个动态大小的内部数组。每个单元 (Cell) 也是一个独立的计数器,通过 CAS 进行更新。 - 分散写入: 当多个线程并发更新时,
LongAdder会尝试将不同的线程引导到更新不同的单元上。这通常是通过哈希线程 ID 或其他分散策略来实现的。这样,大部分线程就可以无锁地更新各自对应的单元,从而大大减少了对单一变量的竞争。
下面以一个例子说明LongAdder的使用,开启100个线程,每个线程进行50万次累加操作:
java
@Slf4j
public class LongAdderDemo {
public static void main(String[] args) {
testLongAdder();
testAtomicLong();
}
private static void testAtomicLong(){
long start = System.nanoTime();
AtomicLong atomicLong = new AtomicLong();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 500000; j++) {
atomicLong.incrementAndGet();
}
});
threadList.add(thread);
thread.start();
}
threadList.forEach(x-> {
try {
x.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
long end = System.nanoTime();
log.info("AtomicLong {}, 耗时:{}毫秒", atomicLong.get(), TimeUnit.NANOSECONDS.toMillis(end - start));
}
private static void testLongAdder() {
long start = System.nanoTime();
LongAdder longAdder = new LongAdder();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 500000; j++) {
longAdder.increment();
}
});
threadList.add(thread);
thread.start();
}
threadList.forEach(x-> {
try {
x.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
long end = System.nanoTime();
log.info("LongAdder {}, 耗时:{}毫秒", longAdder.sum(), TimeUnit.NANOSECONDS.toMillis(end - start));
}
}结果:
txt
14:50:21.618 [main] INFO : LongAdder 50000000, 耗时:98毫秒
14:50:22.661 [main] INFO : AtomicLong 50000000, 耗时:1041毫秒可以看到LongAdder比AtomicLong效率高。