Atomic包与CAS概述

1、什么是Atomic包

java.util.concurrent.atomic 包是Java提供的一组线程安全的原子操作类,可以在不使用锁的情况下实现线程安全操作。这些类主要分为以下几类:

  • 基本类型:AtomicIntegerAtomicLongAtomicBoolean
  • 引用类型:AtomicReferenceAtomicStampedReferenceAtomicMarkableReference
  • 数组类型:AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray
  • 累加器:LongAdderDoubleAdder(Java 8+)

2、什么是CAS

CAS(Compare-And-Swap)是一种无锁算法,包含三个操作数:

  • 内存位置(V)
  • 预期原值(A)
  • 新值(B)

CAS操作逻辑:当且仅当V的值等于A时,才会将V的值更新为B,否则不执行任何操作。无论哪种情况,都会返回V的当前值。

Atomic类和CAS的关系

Java 的 java.util.concurrent.atomic 包中的所有原子类(如 AtomicIntegerAtomicReference)底层均基于 CAS 机制 实现。

  • Atomic类 是暴露给开发者使用的 工具
  • CAS 是这些工具背后的 核心算法
// AtomicInteger 的源码片段(JDK)
public final int incrementAndGet() {
    return U.getAndAddInt(this, VALUE, 1) + 1;
}
// 最终调用的是Unsafe类的CAS操作:
unsafe.compareAndSwapInt(obj, offset, expect, update);

Atomic 包的 所有能力 均围绕 CAS 展开:

  • 原子自增AtomicInteger.getAndIncrement() → CAS 循环
  • 条件更新AtomicReference.compareAndSet() → 直接调用 CAS
  • 复杂累加LongAdder → 分段 CAS 优化
AtomicInteger counter = new AtomicInteger(0);

// 底层CAS实现伪代码:
int oldValue;
do {
    oldValue = counter.get(); // 读取当前值
} while (!counter.compareAndSet(oldValue, oldValue + 1)); // CAS尝试更新

所有使用 Atomic 类的场景都隐含了 CAS 的参与:

  1. 计数器

    // 高并发环境下的计数器
    AtomicInteger count = new AtomicInteger();
    count.incrementAndGet(); // 内部用CAS保证线程安全
    
  2. 状态标志

    // 无锁实现状态切换
    AtomicBoolean isRunning = new AtomicBoolean(true);
    isRunning.compareAndSet(true, false); // CAS更新
    
  3. 非阻塞算法

    // 实现无锁栈(Treiber算法)
    AtomicReference<Node<E>> top = new AtomicReference<>();
    public void push(E item) {
        Node<E> newHead = new Node<>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead)); // CAS更新栈顶
    }
    

核心源码

1、AtomicInteger实现原理

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // 获取Unsafe实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            // 获取value字段的内存偏移地址
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    // CAS核心方法
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    // 原子自增
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
}

2、Unsafe类的CAS操作

Atomic类底层依赖 sun.misc.Unsafe 类的CAS操作:

// Unsafe类提供了多种CAS方法
// 对象字段的CAS操作
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

参数说明:

  • o: 要操作的对象
  • offset: 字段在对象中的偏移量(内存地址)
  • expected: 期望的当前值
  • x: 要设置的新值

基础使用

计数器场景

// 传统同步方式
class Counter {
    private int count = 0;
    public synchronized void increment() {
        count++;
    }
}

// Atomic方式
class AtomicCounter {
    private AtomicInteger count = new AtomicInteger(0);
    public void increment() {
        count.incrementAndGet();
    }
    // 更灵活的操作
    public void add(int delta) {
        count.getAndAdd(delta);
    }
}

状态标志控制

import java.util.concurrent.atomic.AtomicBoolean;

// 系统开关控制
public class FeatureToggle {
    private AtomicBoolean enabled = new AtomicBoolean(false);

    public void enable() {
        enabled.set(true);
    }

    public void disable() {
        enabled.set(false);
    }

    public boolean isEnabled() {
        return enabled.get();
    }

    // 安全切换状态
    public boolean toggle() {
        boolean prev;
        do {
            prev = enabled.get();
        } while (!enabled.compareAndSet(prev, !prev));
        return !prev;
    }
}

库存扣减(电商场景)

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class InventorySystem {
    private AtomicInteger stock = new AtomicInteger(1000); // 总库存

    public boolean deductStock(int quantity) {
        int current;
        do {
            current = stock.get();
            if (current < quantity) {
                return false; // 库存不足
            }
        } while (!stock.compareAndSet(current, current - quantity));

        // 扣减成功,处理订单逻辑

        return true;
    }

    // 测试用例
    public static void main(String[] args) {
        InventorySystem inventory = new InventorySystem();
        // 模拟100个线程并发扣减
        IntStream.range(0, 100).parallel().forEach(i -> {
            if (inventory.deductStock(1)) {
                System.out.println("扣减成功");
            }
        });
        System.out.println("剩余库存: " + inventory.stock.get());
    }
}

序列号生成(分布式ID)

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SequenceGenerator {
    private final AtomicLong sequence = new AtomicLong(0);

    public long nextId() {
        return sequence.getAndIncrement();
    }

    // 生成带时间戳的订单ID(格式:时间戳 + 6位序列号)
    public String generateOrderId() {
        long timestamp = System.currentTimeMillis();
        long seq = nextId();
        return String.format("%d%06d", timestamp, seq % 1_000_000);
    }
}

class SequenceGeneratorTest {
    private static final SequenceGenerator generator = new SequenceGenerator();
    private static final Set<String> generatedIds = ConcurrentHashMap.newKeySet(); // 线程安全的Set
    private static final int THREAD_COUNT = 100; // 并发线程数
    private static final int REQUESTS_PER_THREAD = 1000; // 每个线程生成ID的数量

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        long startTime = System.currentTimeMillis();

        // 提交任务
        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.submit(() -> {
                try {
                    for (int j = 0; j < REQUESTS_PER_THREAD; j++) {
                        String orderId = generator.generateOrderId();
                        if (!generatedIds.add(orderId)) {
                            System.err.println("⚠️ 重复ID: " + orderId);
                        }
                    }
                } finally {
                    latch.countDown();
                }
            });
        }

        // 等待所有线程完成
        latch.await();
        executor.shutdown();

        long totalTime = System.currentTimeMillis() - startTime;
        int totalIds = THREAD_COUNT * REQUESTS_PER_THREAD;

        System.out.println("测试完成!");
        System.out.println("生成ID总数: " + totalIds);
        System.out.println("唯一ID数量: " + generatedIds.size());
        System.out.println("是否有重复: " + (generatedIds.size() != totalIds ? "是" : "否"));
        System.out.println("总耗时: " + totalTime + "ms");
        System.out.println("QPS: " + (totalIds * 1000L / totalTime));
    }
}

实时报价系统

public class PriceUpdater {
    private AtomicReference<BigDecimal> currentPrice = 
        new AtomicReference<>(BigDecimal.ZERO);

    // 多个行情源同时更新价格
    public void updatePrice(BigDecimal newPrice) {
        BigDecimal current;
        do {
            current = currentPrice.get();
            if (newPrice.compareTo(current) <= 0) {
                return; // 新价格不比当前高,不更新
            }
        } while (!currentPrice.compareAndSet(current, newPrice));

        // 价格更新通知
        notifySubscribers(newPrice);
    }

    private void notifySubscribers(BigDecimal price) {
        // 通知订阅者逻辑...
    }
}

CAS的局限性及解决方案

ABA问题

问题描述:线程1读取值为A,线程2将值改为B后又改回A,此时线程1的CAS操作仍会成功,可能导致逻辑错误。

AtomicReference<Integer> account = new AtomicReference<>(100);

// 线程1 (会遭遇ABA问题)
new Thread(() -> {
    int current = account.get(); // 读取100
    System.out.println("线程1读取: " + current);

    // 模拟处理其他事情时被挂起
    try { Thread.sleep(1000); } catch (InterruptedException e) {}

    // 尝试更新
    boolean success = account.compareAndSet(current, 200);
    System.out.println("线程1更新结果: " + success + ",当前值: " + account.get());
}).start();

// 线程2 (制造ABA场景)
new Thread(() -> {
    // 第一次修改:100→50
    boolean success1 = account.compareAndSet(100, 50);
    System.out.println("线程2第一次修改: " + success1 + ",当前值: " + account.get());

    // 第二次修改:50→100
    boolean success2 = account.compareAndSet(50, 100);
    System.out.println("线程2第二次修改: " + success2 + ",当前值: " + account.get());
}).start();

银行账户转账中的ABA问题

在银行转账场景中,假设账户余额为100元:

  1. 线程A读取余额为100,准备转账50元
  2. 线程A被挂起
  3. 线程B转账50元,余额变为50
  4. 线程C存入50元,余额又变回100
  5. 线程A恢复执行,发现余额还是100,认为没有变化,执行转账操作

这样会导致账户实际上被扣款两次(线程B和线程A都扣款了50元),但余额看起来只减少了50元。

解决方案:使用 AtomicStampedReferenceAtomicMarkableReference 添加版本号控制。

AtomicStampedReference 通过添加一个版本号(stamp)来解决ABA问题。每次修改都会递增版本号,CAS操作不仅要比较值,还要比较版本号。

import java.util.concurrent.atomic.AtomicStampedReference;

// 银行账户转账ABA问题示例
public class Account {
    // 使用AtomicStampedReference包装余额,初始值100,初始版本号0
    private AtomicStampedReference<Integer> balance = 
        new AtomicStampedReference<>(100, 0);

    public boolean transfer(int amount) {
        int[] stamp = new int[1]; // 用于保存当前版本号的数组
        int current;
        do {
            // 获取当前值和版本号
            current = balance.get(stamp);
            if (current < amount) {
                return false; // 余额不足,转账失败
            }
        } while (!balance.compareAndSet(
            current, current - amount,  // 期望值和新值
            stamp[0], stamp[0] + 1)); // 期望版本号和新版本号
        return true;
    }
}
  • 使用 get(stamp) 同时获取当前值和版本号
  • compareAndSet() 只有当值和版本号都匹配时才更新

AtomicStampedReferenceAtomicMarkableReference 都是 Java 并发包中用于解决 ABA 问题 的原子引用类

特性 AtomicStampedReference AtomicMarkableReference
版本控制方式 使用 整数版本号(stamp)(每次修改递增) 使用 布尔标记(mark)(true/false)
适用场景 需要严格版本控制的场景(如银行交易) 只需知道”是否被修改过”的场景(如缓存标记)
ABA 问题解决方案 通过版本号检测中间变化 通过布尔标记检测是否被修改过
典型用例 账户余额、计数器 对象状态标记、缓存失效机制

AtomicStampedReference 适用场景:

  • 银行账户余额(防止 ABA 问题)
  • 计数器(需要知道修改次数)
  • 任何需要 精确版本控制 的场景
AtomicStampedReference<Integer> account = new AtomicStampedReference<>(100, 0);

// 线程1:读取值和版本号
int[] stamp = new int[1];
int balance = account.get(stamp); // balance=100, stamp[0]=0

// 线程2:修改值并增加版本号
account.compareAndSet(100, 50, 0, 1); // 版本号 0→1

// 线程1尝试更新(失败,因为版本号已变)
boolean success = account.compareAndSet(100, 200, stamp[0], stamp[0] + 1);
// success = false(因为版本号已变为1,不匹配0)

AtomicMarkableReference 适用场景:

  • 缓存数据是否失效(true=失效,false=有效)
  • 对象是否被锁定(true=已锁定,false=未锁定)
  • 只需知道 “对象是否被修改过”,而不关心修改了多少次
AtomicMarkableReference<String> cache = new AtomicMarkableReference<>("data", false);

// 线程1:读取值和标记
boolean[] mark = new boolean[1];
String data = cache.get(mark); // data="data", mark[0]=false

// 线程2:修改值并设置标记为 true(表示已修改)
cache.compareAndSet("data", "newData", false, true);

// 线程1尝试更新(失败,因为标记已变)
boolean success = cache.compareAndSet("data", "updated", mark[0], true);
// success = false(因为标记已变为 true,不匹配 false)

如何选择?

场景 推荐类
需要严格版本控制(如金融交易) AtomicStampedReference
只需知道是否被修改过(如缓存失效) AtomicMarkableReference

自旋开销

问题描述:高并发场景下CAS失败率增加,线程会持续自旋,导致CPU空转。

public class Counter {
    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
        int oldValue;
        do {
            oldValue = count.get();
        } while (!count.compareAndSet(oldValue, oldValue + 1));
    }
}
// 高并发时可能导致大量线程自旋

解决方案

  • 使用 LongAdder 替代 AtomicLong(Java 8+)
  • 引入适当的退避策略
// 高并发计数器优化
public class HitCounter {
    // 使用 LongAdder 作为计数器
    private LongAdder count = new LongAdder();

    // 增加计数
    public void hit() {
        count.increment();
    }

    // 获取总计数
    public long getCount() {
        return count.sum();
    }
}

LongAdder 采用了 分段累加(Cell 分散) 的策略:

  1. 基础值(base):初始计数值
  2. Cell 数组:当竞争激烈时,会创建多个 Cell,让不同线程更新不同的 Cell
  3. 最终求和sum() 方法返回 base + 所有 Cell 的和

退避算法(Exponential Backoff)

public class BackoffCounter {
    private AtomicInteger count = new AtomicInteger(0);
    // 最小退避时间(单位毫秒)
    private static final int MIN_DELAY = 1;
    // 最大退避时间(单位毫秒),防止退避时间无限增长
    private static final int MAX_DELAY = 100;

    public void increment() {
        int delay = MIN_DELAY; // 初始退避时间
        while (true) {
            int current = count.get();
            if (count.compareAndSet(current, current + 1)) {
                break; // CAS成功,退出循环
            }
            // CAS失败后等待随机时间
            try {
                Thread.sleep((long)(Math.random() * delay));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            delay = Math.min(delay * 2, MAX_DELAY); // 指数增加退避时间
        }
    }
}

指数退避算法:每次失败后,等待时间按指数增长:1, 2, 4, 8,…直到MAX_DELAY

只能保证单个变量原子性

问题描述
CAS无法保证多个变量的原子更新。

解决方案

  • 使用AtomicReference包装多个变量
  • 使用锁机制
// 原子更新多个变量
public class Point {
    private static class State {
        final int x;
        final int y;
        State(int x, int y) {
            this.x = x;
            this.y = y;
        }
    }

    // 原子状态引用
    private AtomicReference<State> state = 
        new AtomicReference<>(new State(0, 0));

    // 原子更新方法
    public void move(int deltaX, int deltaY) {
        State current;
        State newState;
        do {
            current = state.get(); // 获取当前状态
            newState = new State(  // 创建新状态
                current.x + deltaX, 
                current.y + deltaY
            );
        } while (!state.compareAndSet(current, newState));
    }

    public int getX() {
        return state.get().x;
    }
    public int getY() {
        return state.get().y;
    }
}
  • State类被设计为不可变(immutable)的,所有字段都是final的,一旦创建就不能修改

性能对比与选型建议

1、Atomic vs Synchronized

// 性能测试对比
public class PerformanceTest {
    private AtomicInteger atomicCounter = new AtomicInteger(0);
    private int syncCounter = 0;

    @Test
    public void testPerformance() {
        // Atomic测试
        long atomicTime = test(() -> {
            atomicCounter.incrementAndGet();
        });

        // Synchronized测试
        long syncTime = test(() -> {
            synchronized (this) {
                syncCounter++;
            }
        });

        System.out.printf("Atomic: %dms, Synchronized: %dms%n", 
            atomicTime, syncTime);
    }

    private long test(Runnable task) {
        long start = System.currentTimeMillis();
        IntStream.range(0, 1_000_000)
            .parallel()
            .forEach(i -> task.run());
        return System.currentTimeMillis() - start;
    }
}

2、选型建议

场景 推荐方案 理由
简单计数器 AtomicInteger/Long 实现简单,性能好
高并发统计 LongAdder 减少CAS竞争
复杂对象更新 AtomicReference 保证引用原子性
需要版本控制 AtomicStampedReference 解决ABA问题
多变量原子更新 锁机制 CAS无法直接支持

YOLO