ReentrantLock

ReentrantLock 是 Java 并发工具包 (java.util.concurrent.locks) 中的一个重要类,它实现了 Lock 接口,提供了比 synchronized 更灵活、更强大的锁机制。

与synchronized的对比

特性 ReentrantLock synchronized
实现机制 JDK层面实现 JVM层面实现
锁获取方式 显式lock()/unlock() 隐式获取和释放
可中断性 支持lockInterruptibly() 不支持
公平性 可配置公平/非公平模式 完全非公平
条件变量 支持多个Condition 单个等待队列
性能 高竞争下表现更好 Java 6后优化相当
锁绑定 一个锁可绑定多个条件队列 锁与条件队列一对一绑定

核心方法:

public class ReentrantLock implements Lock {
    // 获取锁(不可中断)
    public void lock();

    // 尝试获取锁(立即返回)
    public boolean tryLock();

    // 尝试获取锁(带超时)
    public boolean tryLock(long timeout, TimeUnit unit);

    // 获取锁(可中断)
    public void lockInterruptibly();

    // 释放锁
    public void unlock();

    // 创建条件变量
    public Condition newCondition();
}

基础使用

1、锁的获取和释放必须成对出现

ReentrantLock lock = new ReentrantLock();

lock.lock();  // 必须在try块外获取锁
try {
    // 临界区代码
} finally {
    lock.unlock();  // 确保锁释放
}
  • 锁的获取(lock()) 必须在 try 外。如果在 try 内加锁,可能锁未成功但 finally 仍会执行 unlock()

    对比 synchronizedsynchronized 是 Java 内置锁,无需手动释放,因此不会存在此问题:

    synchronized (lockObject) {  // 自动加锁/解锁
        // 临界区代码
    }
    

ReentrantLock 是显式锁,必须严格遵循 lock-unlock 的配对规则

2、可中断的加锁(避免死锁):

if (lock.tryLock(1, TimeUnit.SECONDS)) { // 尝试获取锁,超时放弃
    try {
        // 临界区代码
    } finally {
        lock.unlock();
    }
} else {
    // 处理加锁失败
}

场景案例

lock()的使用

账户转账案例

public class BankAccount {
    private final ReentrantLock lock = new ReentrantLock();
    private int balance;

    public void transfer(BankAccount to, int amount) {
        // 解决死锁:按hash排序获取锁
        BankAccount first = this.hashCode() < to.hashCode() ? this : to;
        BankAccount second = first == this ? to : this;

        // 按顺序加锁
        first.lock.lock(); // 先锁第一个账户
        try {
            second.lock.lock(); // 再锁第二个账户
            try {
                // 检查余额并转账
                if (this.balance >= amount) {
                    this.balance -= amount;
                    to.balance += amount;
                }
            } finally {
                second.lock.unlock(); // 释放第二个锁
            }
        } finally {
            first.lock.unlock(); // 释放第一个锁
        }
    }
}
  • 通过 锁排序(Lock Ordering) 避免死锁:按对象的哈希值排序,决定加锁顺序,所有线程都按 first → second 的顺序加锁,避免循环等待

    如果 hashCode 相同,仍然可能死锁(但概率极低),可以用 System.identityHashCode() 替代:

    BankAccount first = System.identityHashCode(this) < System.identityHashCode(to) ? this : to;
    
  • 如果账户可能动态变化,可以用 UUID 或数据库 ID 排序:

    BankAccount first = this.getId().compareTo(to.getId()) < 0 ? this : to;
    

System.identityHashCodehashCode 比较:

对比项 hashCode() System.identityHashCode()
是否可重写 是(可能人为导致冲突) 否(始终返回 JVM 内部标识)
冲突概率 高(如 String 哈希算法可能碰撞) 极低(依赖对象内存地址)
适用场景 需要语义相等的对象(如 HashMap 需要唯一对象标识(如锁排序、调试)

lockInterruptibly()

可中断锁(死锁处理)

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;

public class InterruptibleLockDemo {
    private final ReentrantLock lock1 = new ReentrantLock();
    private final ReentrantLock lock2 = new ReentrantLock();

    public void threadA() {
        try {
            lock1.lockInterruptibly(); // 可中断获取锁1
            try {
                Thread.sleep(100); // 模拟工作耗时
                lock2.lockInterruptibly(); // 可中断获取锁2
                try {
                    System.out.println("ThreadA完成任务");
                } finally {
                    lock2.unlock(); // 释放锁2
                }
            } finally {
                lock1.unlock(); // 释放锁1
            }
        } catch (InterruptedException e) {
            System.out.println("ThreadA被中断"); // 响应中断
        }
    }

    // threadB和threadA类似但以相反顺序获取锁
    public void threadB() {
        try {
            lock2.lockInterruptibly();  // 先获取锁2
            try {
                Thread.sleep(100);      // 模拟工作耗时
                lock1.lockInterruptibly(); // 再尝试获取锁1
                try {
                    System.out.println("ThreadB完成任务");
                } finally {
                    lock1.unlock();
                }
            } finally {
                lock2.unlock();
            }
        } catch (InterruptedException e) {
            System.out.println("ThreadB被中断");
        }
    }

    public static void main(String[] args) {
        InterruptibleLockDemo demo = new InterruptibleLockDemo();
        Thread t1 = new Thread(demo::threadA);
        Thread t2 = new Thread(demo::threadB);

        // threadA 先获取 lock1,再尝试获取 lock2
        // threadB 先获取 lock2,再尝试获取 lock1
        // 两个线程同时执行,会互相等待对方的锁,导致死锁
        t1.start();
        t2.start();

        // 中断死锁
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                t1.interrupt(); // 中断线程1
                t2.interrupt(); // 中断线程2
                timer.cancel(); // 关闭定时器
            }
        }, 2000); // 2秒后中断线程
    }
}
  • lockInterruptibly():如果线程在等待锁时被中断,会立即抛出 InterruptedException
  • Timer2秒后 触发,无条件中断 这两个线程(无论是否真的死锁)。线程被中断后,lockInterruptibly() 会抛出 InterruptedException,释放已持有的锁,从而解除死锁
    • 调用 timer.cancel() 停止 Timer,避免重复中断

为什么用 lockInterruptibly()

  • 避免永久阻塞:普通 lock() 会无视中断请求,导致线程无法退出;lockInterruptibly() 如果线程在等待锁时被中断,会立即抛出 InterruptedException
  • 适合需要响应中断的场景:如任务超时、系统关闭等。

公平锁

公平锁(订单处理系统)

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

// 订单处理系统,使用公平锁来保证多线程环境下的订单处理顺序
public class OrderProcessingSystem {
    private final ReentrantLock lock = new ReentrantLock(true); // 公平锁

    public void processOrder(Order order) {
        System.out.println(Thread.currentThread().getName() +
                " 尝试获取锁,时间: " + System.currentTimeMillis());
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName()
                    + " 处理订单: " + order.getId());
            // 模拟处理时间
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 恢复中断状态
            System.out.println(Thread.currentThread().getName() + " 被中断,取消订单处理");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        OrderProcessingSystem system = new OrderProcessingSystem();
        CountDownLatch lockHeldSignal = new CountDownLatch(1); // 信号量:锁已被占用

        // 先启动一个线程,长时间持有锁
        new Thread(() -> {
            system.lock.lock(); // 直接占锁,不执行订单
            System.out.println("Blocking-Thread 已持有锁,迫使其他线程排队");
            lockHeldSignal.countDown(); // 通知主线程:锁已被占用
            try {
                Thread.sleep(2000); // 模拟长时间持有锁
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                system.lock.unlock();
            }
        }, "Blocking-Thread").start();

        // 确保锁已被占用后再启动其他线程
        try {
            lockHeldSignal.await(); // 阻塞,直到锁被占用
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // 此时锁已被占用,公平锁会按顺序分配
        ExecutorService executor = Executors.newFixedThreadPool(10);
        IntStream.range(0, 10).forEach(i -> {
            executor.submit(() -> {
                system.processOrder(new Order(i));
            });
        });
        executor.shutdown();
    }
}

class Order {
    private final int id;
    public Order(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
}

ReentrantLock(true)

  • 使用 公平锁fair=true),确保线程按请求锁的顺序获取锁(避免线程饥饿)。
  • 如果是非公平锁(默认),线程可能会插队,导致某些线程长时间等待。

ReentrantLock + Condition

生产者消费者模式

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MessageQueue<T> {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition(); // 队列未满条件
    private final Condition notEmpty = lock.newCondition(); // 队列非空条件
    private final Object[] items; // 环形队列
    private int putPtr, takePtr, count; // 写入指针、读取指针、当前元素数

    public MessageQueue(int capacity) {
        items = new Object[capacity];
    }

    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();  // 队列满时等待
            }
            items[putPtr] = x; // 写入数据
            if (++putPtr == items.length) putPtr = 0; // 环形指针处理
            count++;
            notEmpty.signal();  // 唤醒一个消费者线程
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) { // 队列空时等待
                notEmpty.await();  // 释放锁并阻塞
            }
            @SuppressWarnings("unchecked")
            T x = (T) items[takePtr]; // 读取数据
            if (++takePtr == items.length) takePtr = 0; // 环形指针处理
            count--;
            notFull.signal();  // 唤醒一个生产者线程
            return x;
        } finally {
            lock.unlock();
        }
    }
}
class MessageQueueTest {
    public static void main(String[] args) throws InterruptedException {
        MessageQueue<String> queue = new MessageQueue<>(5);  // 容量为5的队列

        // 生产者线程
        ExecutorService producer = Executors.newSingleThreadExecutor();
        producer.submit(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    queue.put("消息-" + i);
                    System.out.println("生产: 消息-" + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 消费者线程
        ExecutorService consumer = Executors.newSingleThreadExecutor();
        consumer.submit(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String msg = queue.take();
                    System.out.println("消费: " + msg);
                    Thread.sleep(200);  // 模拟处理耗时
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.shutdown();
        consumer.shutdown();
        producer.awaitTermination(10, TimeUnit.SECONDS);
        consumer.awaitTermination(10, TimeUnit.SECONDS);
    }
}
  1. ReentrantLock互斥访问共享资源

    • 保护共享变量items(队列数组)、putPtr(写入指针)、takePtr(读取指针)、count(当前元素数量)的读写操作必须互斥。

    • 保证 puttake 操作的原子性,防止并发修改导致数据不一致。

  2. 条件变量(Condition)实现精准唤醒

    • notFull:队列未满时唤醒生产者线程(put 方法)。

    • notEmpty:队列非空时唤醒消费者线程(take 方法)。

    • 避免虚假唤醒:通过 while 循环重复检查条件(如 while (count == 0))。

  3. 阻塞操作:

    • 队列满时,put 方法阻塞生产者,直到有空位。
    • 队列空时,take 方法阻塞消费者,直到有新数据。

完整工作流程

生产者线程(put)

sequenceDiagram
    participant Producer
    participant Lock
    participant notFull
    participant Queue

    Producer->>Lock: lock()
    alt 队列已满
        Producer->>notFull: await()
        notFull-->>Producer: 阻塞并释放锁
    else 队列未满
        Producer->>Queue: 写入数据
        Producer->>notEmpty: signal()
    end
    Producer->>Lock: unlock()

消费者线程(take)

sequenceDiagram
    participant Consumer
    participant Lock
    participant notEmpty
    participant Queue

    Consumer->>Lock: lock()
    alt 队列为空
        Consumer->>notEmpty: await()
        notEmpty-->>Consumer: 阻塞并释放锁
    else 队列非空
        Consumer->>Queue: 读取数据
        Consumer->>notFull: signal()
    end
    Consumer->>Lock: unlock()

线程交替执行

银行叫号系统

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 银行票务系统,通过 ReentrantLock 和 Condition 实现 VIP客户优先 的业务办理逻辑:
 *  - VIP客户:优先办理,处理完成后切换回普通模式。
 *  - 普通客户:按顺序办理,每处理3个后切换VIP模式。
 */
public class BankTicketSystem {
    private final ReentrantLock lock = new ReentrantLock();
    // VIP客户的等待队列(当 isVipTurn=false 时阻塞)
    private final Condition vipCondition = lock.newCondition();
    // 普通客户的等待队列(当 isVipTurn=true 时阻塞)
    private final Condition normalCondition = lock.newCondition();
    // 当前是否优先处理VIP客户
    private boolean isVipTurn = false;

    // VIP客户流程
    public void vipCustomer(int num) throws InterruptedException {
        lock.lock();
        try {
            while (!isVipTurn) { // 如果不是VIP轮次,释放锁并阻塞
                vipCondition.await();
            }
            System.out.println("VIP客户-" + num + "正在办理业务");
            Thread.sleep(500); // 模拟业务处理
            isVipTurn = false; // 切换为普通模式
            normalCondition.signalAll(); // 唤醒所有普通客户
        } finally {
            lock.unlock();
        }
    }

    // 普通客户流程
    public void normalCustomer(int num) throws InterruptedException {
        lock.lock();
        try {
            while (isVipTurn) {
                normalCondition.await();
            }
            System.out.println("普通客户-" + num + "正在办理业务");
            Thread.sleep(300); // 模拟业务处理
            if (num % 3 == 0) { // 每处理3个普通客户切换VIP
                isVipTurn = true;
                vipCondition.signal(); // 唤醒一个VIP客户
            }
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        BankTicketSystem system = new BankTicketSystem();

        // VIP客户线程(每隔一段时间尝试办理)
        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    system.vipCustomer(i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();

        // 普通客户线程(持续办理)
        new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                try {
                    system.normalCustomer(i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}
  • signal() vs signalAll():VIP客户唤醒一个即可,普通客户需唤醒所有(避免遗漏)

大致流程:

普通客户-1 办理
普通客户-2 办理
普通客户-3 办理 → 切换VIP模式
VIP客户-1 办理 → 切换普通模式
普通客户-4 办理
...

交易系统订单匹配引擎

/**
 * 订单撮合引擎 的简化实现,用于模拟金融市场中的买卖订单匹配逻辑。
 * 核心功能包括:
 * - 订单管理:按价格优先级维护买卖订单队列。
 * - 撮合引擎:持续匹配符合价格的买卖订单。
 * - 线程安全:通过 ReentrantLock 和 Condition 保证并发安全。
 */
public class OrderMatchingEngine {
    private final ReentrantLock lock = new ReentrantLock();
    // 通知机制,当新订单到达时唤醒撮合线程
    private final Condition orderAdded = lock.newCondition();
    // 买方订单队列,按价格 降序 排列(高价优先)
    private final PriorityQueue<Order> buyOrders = new PriorityQueue<>(
            (a, b) -> Double.compare(b.getPrice(), a.getPrice()));
    // 卖方订单队列,按价格 升序 排列(低价优先)
    private final PriorityQueue<Order> sellOrders = new PriorityQueue<>(
            Comparator.comparingDouble(Order::getPrice));

    // 添加订单
    public void addOrder(Order order) {
        lock.lock();
        try {
            if (order.getType() == OrderType.BUY) {
                buyOrders.add(order);
            } else {
                sellOrders.add(order);
            }
            orderAdded.signalAll(); // 唤醒撮合线程
        } finally {
            lock.unlock();
        }
    }

    // 订单撮合逻辑
    public void matchOrders() throws InterruptedException {
        lock.lock();
        try {
            while (true) {
                // 检查是否可撮合
                // 如果买方或卖方队列为空,或最高买价 < 最低卖价 则无法成交
                while (buyOrders.isEmpty() || sellOrders.isEmpty()
                        || buyOrders.peek().getPrice() < sellOrders.peek().getPrice()) {
                    orderAdded.await(); // 阻塞等待新订单
                }

                // 执行撮合
                // 取出队列头部订单(最高买价和最低卖价)成交,计算成交价(买卖均价的中间价)和成交量(较小数量)
                Order buy = buyOrders.poll();
                Order sell = sellOrders.poll();
                double price = (buy.getPrice() + sell.getPrice()) / 2; // 中间价成交
                // 取买卖双方数量的较小值,确保成交不超过任一方意愿
                double quantity = Math.min(buy.getQuantity(), sell.getQuantity());

                System.out.printf("成交: %.2f @ %.2f%n", quantity, price);

                // 处理剩余数量
                if (buy.getQuantity() > quantity) {
                    buy.setQuantity(buy.getQuantity() - quantity);
                    buyOrders.add(buy); // 放回剩余部分
                }
                if (sell.getQuantity() > quantity) {
                    sell.setQuantity(sell.getQuantity() - quantity);
                    sellOrders.add(sell);
                }
            }
        } finally {
            lock.unlock();
        }
    }
}
class Order {
    private final OrderType type; // BUY 或 SELL
    private final double price;
    private double quantity; // 股票数量

    // Constructor, Getters, Setters...
}

enum OrderType { BUY, SELL }
  • PriorityQueue<Order> 优先队列,按价格排序,分别存储买方和卖方的未成交订单,确保高效撮合

阻塞唤醒机制

  • 等待条件:当无法撮合时,调用 orderAdded.await() 释放锁并阻塞。
  • 唤醒条件addOrder 添加新订单后调用 orderAdded.signalAll()

锁分段技术

库存系统

import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

// 分段锁(Striped Lock) 实现线程安全库存系统
// 通过 数据分片 减少锁竞争,提高并发性能
public class SegmentInventory {
    // 每个段包含的item数量(向上取整)
    private final int segmentSize;
    // 锁数组,每个段一个独立锁
    private final ReentrantLock[] locks;
    // 库存数据数组,counts[itemId] 表示对应商品的库存数
    private final int[] counts;

    /**
     * @param itemCount        商品总数
     * @param concurrencyLevel 并发段数
     */
    public SegmentInventory(int itemCount, int concurrencyLevel) {
        // 计算每个段的大小
        this.segmentSize = (itemCount + concurrencyLevel - 1) / concurrencyLevel;
        this.locks = new ReentrantLock[concurrencyLevel];
        this.counts = new int[itemCount];
        Arrays.setAll(locks, i -> new ReentrantLock()); // 初始化锁数组
    }

    // 增加库存
    public void increase(int itemId) {
        int segment = itemId / segmentSize; // 计算所属段
        locks[segment].lock(); // 获取分段锁
        try {
            counts[itemId]++; // 修改库存
        } finally {
            locks[segment].unlock(); // 释放锁
        }
    }

    // 查询库存
    public int getCount(int itemId) {
        int segment = itemId / segmentSize;
        locks[segment].lock();
        try {
            return counts[itemId]; // 返回库存
        } finally {
            locks[segment].unlock();
        }
    }
}
class SegmentInventoryTest {
    public static void main(String[] args) throws InterruptedException {
        // 分段锁性能验证。分段锁性能优于全局锁
//        testPerformance();
        // 分段冲突测试。同段操作串行,不同段并发
        testSegmentConflict();
    }

    static void testPerformance() throws InterruptedException {
        int itemCount = 1000;
        int threads = 16;

        // 测试全局锁性能
        long start1 = System.currentTimeMillis();
        testWithGlobalLock(itemCount, threads);
        long duration1 = System.currentTimeMillis() - start1;

        // 测试分段锁性能
        long start2 = System.currentTimeMillis();
        testWithSegmentLock(itemCount, threads);
        long duration2 = System.currentTimeMillis() - start2;

        System.out.println("全局锁耗时: " + duration1 + "ms");
        System.out.println("分段锁耗时: " + duration2 + "ms");
    }
    static void testWithGlobalLock(int itemCount, int threads) throws InterruptedException {
        ReentrantLock globalLock = new ReentrantLock();
        int[] counts = new int[itemCount];
        ExecutorService executor = Executors.newFixedThreadPool(threads);

        for (int i = 0; i < 10000; i++) {
            int itemId = i % itemCount;
            executor.submit(() -> {
                globalLock.lock();
                try {
                    counts[itemId]++;
                } finally {
                    globalLock.unlock();
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }

    static void testWithSegmentLock(int itemCount, int threads) throws InterruptedException {
        SegmentInventory inventory = new SegmentInventory(itemCount, 16); // 16个分段
        ExecutorService executor = Executors.newFixedThreadPool(threads);

        for (int i = 0; i < 10000; i++) {
            int itemId = i % itemCount;
            executor.submit(() -> inventory.increase(itemId));
        }

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }

    static void testSegmentConflict() throws InterruptedException {
        SegmentInventory inventory = new SegmentInventory(100, 4); // 段大小=25
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 两个线程操作同段的item(26和30属于段1)
        executor.submit(() -> {
            for (int i = 0; i < 1000; i++) inventory.increase(26);
        });
        executor.submit(() -> {
            for (int i = 0; i < 1000; i++) inventory.increase(30);
        });

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.SECONDS);
        System.out.println("商品26库存: " + inventory.getCount(26)); // 预期1000
        System.out.println("商品30库存: " + inventory.getCount(30)); // 预期1000
    }
}
  • 操作 itemId=5itemId=30 会分别锁定 段0段1,互不阻塞。
  • 操作 itemId=10itemId=20 均属于 段0,会互斥阻塞。
  1. segmentSize 计算公式:segmentSize = (itemCount + concurrencyLevel - 1) / concurrencyLevel;,通过 (itemCount + concurrencyLevel - 1) 调整分子,确保所有库存项被覆盖:

    • itemCount 不能被 concurrencyLevel 整除时,余数部分会使得分子足够大,从而让整数除法结果增加 1
    • 如果整除,则 + concurrencyLevel - 1 不会改变结果。
  2. 分段映射规则:int segment = itemId / segmentSize;

    为什么分段映射用除法(/)而不是取模(%)?

    • 使用除法的目的:将连续的 itemId 分配到同一个分段,保证 局部性(Locality),减少锁切换开销。

    • 用取模会破坏局部性,取模会导致相邻的 itemId 被分散到不同分段,增加锁竞争概率。

    • 除法 vs 取模的对比

      | 操作 | 效果 | 适用场景 |
      | :——- | :—————————– | :————————————— |
      | / | 按范围分片(连续ID归到同一段) | 需要局部性优化的场景(如相邻ID高频访问) |
      | % | 哈希分片(ID均匀分散到各段) | 需要绝对均匀分布的场景 |

与全局锁比较:

场景 全局锁(synchronized) 分段锁(SegmentInventory)
操作不同段的item 串行执行 并行执行(无竞争)
操作同段的item 串行执行 串行执行(段内竞争)
锁竞争概率 低(与 concurrencyLevel 相关)
适用场景 低并发 高并发(如电商库存系统)

锁降级

缓存系统

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

// 基于 读写锁(ReentrantReadWriteLock) 的线程安全缓存系统
public class CacheSystem<K, V> {
    // 提供读写锁分离机制
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock readLock = rwl.readLock(); // 共享锁,允许多线程并发读
    private final Lock writeLock = rwl.writeLock(); // 独占锁,写操作或缓存更新时使用
    private Map<K, V> cache = new HashMap<>();

    public V get(K key) {
        // 第一阶段:尝试读缓存(共享锁)
        readLock.lock();
        try {
            V value = cache.get(key);
            if (value != null) {
                return value; // 缓存命中直接返回
            }
        } finally {
            readLock.unlock();
        }

        // 第二阶段:缓存未命中,加写锁
        writeLock.lock();
        try {
            // 双重检查,防止其他线程已更新
            V value = cache.get(key);
            if (value == null) {
                value = loadFromDatabase(key); // 模拟数据库加载
                cache.put(key, value);
            }
            // 锁降级:在释放写锁前获取读锁
            readLock.lock();
            return value;
        } finally {
            writeLock.unlock(); // 释放写锁,保持读锁
            // 此处仍持有读锁,保护后续操作
        }
    }

    private V loadFromDatabase(K key) {
        // 模拟数据库加载
        return null;
    }
}

关键点:

  1. 双重检查的必要性

    writeLock.lock();
    try {
        // 再次检查缓存
        V value = cache.get(key);
        if (value == null) {
            value = loadFromDatabase(key);
            cache.put(key, value);
        }
        // ...
    }
    
    • 目的:防止多个线程同时通过第一次检查后,重复加载数据(缓存击穿)。
    • 效果:只有第一个拿到写锁的线程会执行加载,后续线程直接读取缓存。
  2. 锁降级(Lock Downgrading)

    writeLock.lock();
    try {
        // ... 写操作
        readLock.lock();  // 锁降级
    } finally {
        writeLock.unlock(); // 释放写锁,保持读锁
    }
    
    • 作用:

      • 保证在写锁释放后,其他线程无法修改数据(因为仍持有读锁)。

      • 避免数据在返回过程中被其他写操作破坏一致性。

    • 注意事项:

      • 锁降级 必须 在写锁未释放前获取读锁,顺序不能颠倒。
      • 不支持锁升级(读锁 → 写锁),会导致死锁。

Q:锁降级是否必要?

如果去掉锁降级:

writeLock.lock();
try {
    V value = cache.get(key);
    if (value == null) {
        value = loadFromDatabase(key);
        cache.put(key, value);
    }
    return value; // 直接返回,不持有读锁
} finally {
    writeLock.unlock();
}

writeLock.unlock() 和返回结果之间,其他线程可能修改数据,导致返回脏数据。

分布式锁(基于Redis+ReentrantLock)

public class DistributedLock {
    private final ReentrantLock localLock = new ReentrantLock();
    private final RedisTemplate<String, String> redisTemplate;
    private final String lockKey;
    private final String clientId = UUID.randomUUID().toString();
    private final long expireTime = 30000; // 30秒

    public boolean tryLock(long waitTime) throws InterruptedException {
        long start = System.currentTimeMillis();

        // 先获取本地锁
        if (!localLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
            return false;
        }

        try {
            // 循环尝试获取分布式锁
            do {
                Boolean success = redisTemplate.opsForValue().setIfAbsent(
                    lockKey, clientId, expireTime, TimeUnit.MILLISECONDS);
                if (Boolean.TRUE.equals(success)) {
                    return true;
                }

                // 检查锁是否过期
                String currentValue = redisTemplate.opsForValue().get(lockKey);
                if (currentValue != null && Long.parseLong(currentValue) < System.currentTimeMillis()) {
                    // 锁已过期,尝试抢占
                    String oldValue = redisTemplate.opsForValue().getAndSet(
                        lockKey, clientId);
                    if (oldValue != null && oldValue.equals(currentValue)) {
                        return true;
                    }
                }

                if (System.currentTimeMillis() - start > waitTime) {
                    break;
                }

                Thread.sleep(100); // 短暂等待后重试
            } while (true);

            return false;
        } finally {
            localLock.unlock();
        }
    }

    public void unlock() {
        // 先检查本地锁
        if (!localLock.isHeldByCurrentThread()) {
            throw new IllegalMonitorStateException();
        }

        try {
            // 验证是当前客户端持有的锁
            String value = redisTemplate.opsForValue().get(lockKey);
            if (clientId.equals(value)) {
                redisTemplate.delete(lockKey);
            }
        } finally {
            localLock.unlock();
        }
    }
}

常见问题与解决方案

死锁预防

问题代码

// 线程1
lockA.lock();
lockB.lock();

// 线程2
lockB.lock();
lockA.lock();

解决方案

方案一 锁顺序一致性

  • 确保所有线程以相同的顺序获取多个锁:先 fromto,避免死锁(需全局统一顺序)
// 使用tryLock+定时回退
public boolean transfer(Account from, Account to, int amount) {
    long waitTime = 1000; // 1秒超时
    long endTime = System.nanoTime() + waitTime * 1000000;

    while (true) {
        if (from.lock.tryLock()) {
            try {
                if (to.lock.tryLock()) {
                    try {
                        // 执行转账
                        return true;
                    } finally {
                        to.lock.unlock();
                    }
                }
            } finally {
                from.lock.unlock();
            }
        }

        if (System.nanoTime() > endTime) {
            return false;
        }

        Thread.sleep(100); // 短暂休眠
    }
}

方案二 设置锁超时

  • 使用 tryLock(long timeout, TimeUnit unit) 方法设置获取锁的超时时间
if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        // 临界区代码
    } finally {
        lock.unlock();
    }
} else {
    // 处理无法获取锁的情况
}

锁粒度过大

问题代码

  • 使用一个锁保护过多的资源或操作,导致并发性能下降。
private final ReentrantLock lock = new ReentrantLock();

public void updateUser(User user) {
    lock.lock();
    try {
        // 更新用户基本信息
        updateBasicInfo(user);
        // 更新用户订单
        updateOrders(user);
        // 更新用户地址
        updateAddresses(user);
    } finally {
        lock.unlock();
    }
}

优化方案

  • 减小锁粒度,将一个大锁拆分为多个小锁,每个锁保护不同的资源
  • 使用分段锁。对于集合类数据,可以使用分段锁提高并发度
private final ReentrantLock basicInfoLock = new ReentrantLock();
private final ReentrantLock ordersLock = new ReentrantLock();
private final ReentrantLock addressesLock = new ReentrantLock();

public void updateUser(User user) {
    basicInfoLock.lock();
    try {
        updateBasicInfo(user);
    } finally {
        basicInfoLock.unlock();
    }

    ordersLock.lock();
    try {
        updateOrders(user);
    } finally {
        ordersLock.unlock();
    }

    addressesLock.lock();
    try {
        updateAddresses(user);
    } finally {
        addressesLock.unlock();
    }
}

锁泄漏问题

问题代码

  • 忘记释放锁,导致其他线程无法获取锁,程序挂起。
public class LockLeakExample {
    private final Lock lock = new ReentrantLock();

    public void riskyMethod() throws Exception {
        lock.lock(); // 获取锁
        if (someCondition()) {
            throw new Exception("Something went wrong");
            // 如果抛出异常,锁永远不会被释放
        }
        try {
            // 正常操作
        } finally {
            lock.unlock(); // 只有在不抛出异常时才会执行
        }
    }

    private boolean someCondition() {
        return Math.random() > 0.5;
    }
}

优化方案

  • 使用 try-finally 确保锁释放
  • 使用 Java 7 的 try-with-resources 模式
public class LockLeakSolution {
    private final Lock lock = new ReentrantLock();

    public void safeMethod() throws Exception {
        // 使用自定义的LockResource实现AutoCloseable
        try (LockResource lr = new LockResource(lock)) {
            if (someCondition()) {
                throw new Exception("Something went wrong");
            }
            // 正常操作
        } // 自动调用close()释放锁
    }

    private boolean someCondition() {
        return Math.random() > 0.5;
    }

    // 辅助类实现AutoCloseable
    private static class LockResource implements AutoCloseable {
        private final Lock lock;

        public LockResource(Lock lock) {
            this.lock = lock;
            lock.lock(); // 在构造时获取锁
        }

        @Override
        public void close() {
            lock.unlock(); // 在try-with-resources结束时自动释放锁
        }
    }
}

活锁问题

问题代码

  • 线程不断重试某个操作,但始终无法取得进展。
public class LiveLockExample {
    private final Lock lock1 = new ReentrantLock();
    private final Lock lock2 = new ReentrantLock();

    public void method1() {
        while (true) {
            if (lock1.tryLock()) {
                try {
                    if (lock2.tryLock()) {
                        try {
                            System.out.println("method1 成功");
                            return;
                        } finally {
                            lock2.unlock();
                        }
                    }
                } finally {
                    lock1.unlock();
                }
            }
            // 没有休眠,可能导致CPU高负载
        }
    }

    public void method2() {
        while (true) {
            if (lock2.tryLock()) {
                try {
                    if (lock1.tryLock()) {
                        try {
                            System.out.println("method2 成功");
                            return;
                        } finally {
                            lock1.unlock();
                        }
                    }
                } finally {
                    lock2.unlock();
                }
            }
            // 没有休眠,可能导致CPU高负载
        }
    }
}

优化方案

  • 引入随机退避:在重试时加入随机延迟
  • 限制重试次数:设置最大重试次数,超过后放弃或采取其他策略
public class LiveLockSolution {
    private final Lock lock1 = new ReentrantLock();
    private final Lock lock2 = new ReentrantLock();
    private final Random random = new Random();

    // 解决方案1: 引入随机退避
    public void method1WithBackoff() {
        while (true) {
            if (lock1.tryLock()) {
                try {
                    if (lock2.tryLock()) {
                        try {
                            System.out.println("method1 成功");
                            return;
                        } finally {
                            lock2.unlock();
                        }
                    }
                } finally {
                    lock1.unlock();
                }
            }
            // 随机休眠避免活锁
            try {
                Thread.sleep(random.nextInt(50));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    // 解决方案2: 限制重试次数
    public void method2WithRetryLimit() {
        int retryCount = 0;
        final int maxRetries = 10;

        while (retryCount < maxRetries) {
            if (lock2.tryLock()) {
                try {
                    if (lock1.tryLock()) {
                        try {
                            System.out.println("method2 成功");
                            return;
                        } finally {
                            lock1.unlock();
                        }
                    }
                } finally {
                    lock2.unlock();
                }
            }

            retryCount++;
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }

        System.out.println("method2 达到最大重试次数");
        // 执行备用方案
    }
}

饥饿问题

问题代码

  • 某些线程长时间无法获取锁,一直处于等待状态。
public class StarvationExample {
    private final Lock lock = new ReentrantLock(); // 非公平锁

    public void worker(String name) {
        while (true) {
            lock.lock();
            try {
                System.out.println(name + " 获得锁");
                Thread.sleep(1000); // 模拟工作
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                lock.unlock();
            }
        }
    }

    // 多个线程竞争时,某些线程可能长时间无法获取锁
}

优化方案

  • 使用公平锁
  • 限制线程持有锁的时间:避免单个线程长时间占用锁
  • 优先级调整。适当调整线程优先级,但要注意过度使用可能导致其他问题
// 限制锁持有时间
public class TimeLimitedLockSolution {
    private final Lock lock = new ReentrantLock();
    private static final long MAX_LOCK_HOLD_TIME_MS = 500; // 最大持有锁时间500ms

    public void worker(String name) {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 尝试获取锁,最多等待200ms
                if (lock.tryLock(200, TimeUnit.MILLISECONDS)) {
                    try {
                        long start = System.currentTimeMillis();
                        System.out.println(name + " 获得锁");

                        // 模拟工作,但不超过最大持有时间
                        long workTime = Math.min(1000, MAX_LOCK_HOLD_TIME_MS);
                        Thread.sleep(workTime);

                        // 检查是否已经持有锁过长时间
                        if (System.currentTimeMillis() - start > MAX_LOCK_HOLD_TIME_MS) {
                            System.out.println(name + " 即将超时,主动释放锁");
                            break; // 退出当前工作循环,释放锁
                        }
                    } finally {
                        lock.unlock();
                        System.out.println(name + " 释放锁");
                    }
                } else {
                    System.out.println(name + " 等待锁超时,将重试");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        TimeLimitedLockSolution example = new TimeLimitedLockSolution();

        // 创建多个工作线程
        for (int i = 1; i <= 5; i++) {
            String workerName = "Worker-" + i;
            new Thread(() -> example.worker(workerName)).start();
        }
    }
}
// 优先级调整
public class EnhancedPrioritySolution {
    private final Lock lock = new ReentrantLock(true); // 使用公平锁作为基础
    private static final long PRIORITY_BOOST_THRESHOLD_MS = 2000; // 等待超过2秒提升优先级
    private static final long MAX_WAIT_TIME_MS = 5000; // 最大等待时间5秒

    public void worker(String name, int basePriority) {
        if (basePriority < Thread.MIN_PRIORITY || basePriority > Thread.MAX_PRIORITY) {
            basePriority = Thread.NORM_PRIORITY;
        }

        Thread.currentThread().setPriority(basePriority);
        long waitStartTime = System.currentTimeMillis();
        boolean priorityBoosted = false;

        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 计算剩余等待时间
                long waitedTime = System.currentTimeMillis() - waitStartTime;
                long remainingWaitTime = MAX_WAIT_TIME_MS - waitedTime;

                if (remainingWaitTime <= 0) {
                    System.out.printf("%s 等待超时(已等待%dms),放弃本次操作%n", 
                                     name, waitedTime);
                    break;
                }

                // 动态优先级调整:等待时间超过阈值时提升优先级
                if (!priorityBoosted && waitedTime > PRIORITY_BOOST_THRESHOLD_MS) {
                    int newPriority = Math.min(basePriority + 1, Thread.MAX_PRIORITY);
                    Thread.currentThread().setPriority(newPriority);
                    priorityBoosted = true;
                    System.out.printf("%s 已等待%dms,优先级从%d提升到%d%n",
                                    name, waitedTime, basePriority, newPriority);
                }

                // 尝试获取锁,带有超时
                if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {
                    try {
                        waitStartTime = System.currentTimeMillis(); // 重置等待时间
                        if (priorityBoosted) {
                            Thread.currentThread().setPriority(basePriority); // 恢复原始优先级
                            priorityBoosted = false;
                        }

                        System.out.printf("%s (优先级:%d) 获得锁,执行工作%n", 
                                        name, Thread.currentThread().getPriority());
                        Thread.sleep(800); // 模拟工作
                    } finally {
                        lock.unlock();
                        System.out.printf("%s 释放锁%n", name);
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        EnhancedPrioritySolution example = new EnhancedPrioritySolution();

        // 创建多个工作线程,设置不同基础优先级
        for (int i = 1; i <= 5; i++) {
            String workerName = "Worker-" + i;
            int priority = i == 3 ? Thread.MAX_PRIORITY : // 一个高优先级线程
                          i == 5 ? Thread.MIN_PRIORITY : // 一个低优先级线程
                          Thread.NORM_PRIORITY;         // 其他普通优先级

            new Thread(() -> example.worker(workerName, priority), workerName).start();
        }
    }
}

最佳实践总结

  1. 总是使用 try-finally 释放锁,确保锁在任何情况下都能被释放
  2. 尽量减小锁的粒度,只锁定必要的资源
  3. 考虑使用更高级的同步工具,如 CountDownLatch、CyclicBarrier、Semaphore 等
  4. 避免在持有锁时调用外部方法,这可能导致死锁或性能问题
  5. 考虑使用并发集合,如 ConcurrentHashMap,而不是自己实现同步
  6. 在适当的时候使用 volatile 变量,对于简单的状态标志可能不需要完整同步
  7. 考虑无锁算法,如使用 Atomic 类(AtomicInteger, AtomicReference 等)

其他

Timer

Timer 是 Java 提供的一个 定时任务调度器,用于在 指定的时间固定的时间间隔 执行任务。timer.schedule()Timer 的核心方法,用于安排任务的执行。

Timer的基本使用

Timer timer = new Timer();  // 创建一个定时器
  • 守护线程 vs 用户线程

    • Timer 默认使用 用户线程(非守护线程),即使主线程结束,Timer 仍然会运行。

    • 可以使用 Timer(true) 创建 守护线程,当所有用户线程结束时,Timer 自动终止:

      Timer timer = new Timer(true);  // 守护线程模式
      

安排任务执行

Timer 提供了多个 schedule() 方法,用于不同场景的任务调度:

方法 作用
schedule(TimerTask task, long delay) 延迟 delay 毫秒后执行任务(单次执行
schedule(TimerTask task, Date time) 在指定 Date 时间执行任务(单次执行
schedule(TimerTask task, long delay, long period) 延迟 delay 毫秒后首次执行,之后每隔 period 毫秒重复执行(固定延迟
schedule(TimerTask task, Date firstTime, long period) firstTime 首次执行,之后每隔 period 毫秒重复执行(固定延迟
scheduleAtFixedRate(TimerTask task, long delay, long period) 类似 schedule,但采用 固定速率(如果任务执行时间超过 period,会立即执行下一次)

Timer.schedule()

基本语法:

// 单次任务示例
Timer timer = new Timer();
timer.schedule(
    new TimerTask() {  // 定义任务
        @Override
        public void run() {
            System.out.println("任务执行!");
        }
    },
    2000  // 延迟 2 秒执行
);
  • TimerTask:抽象类,表示要执行的任务(类似 Runnable
  • delay:延迟时间(毫秒),表示任务多久后执行

周期性任务示例

Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        System.out.println("每隔1秒执行一次");
    }
}, 0, 1000);  // 立即开始,每隔 1000ms 执行

ScheduledExecutorService

由于 Timer 的局限性,Java 5 引入了 ScheduledExecutorService(基于线程池),更推荐使用:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.schedule(() -> {
    System.out.println("2秒后执行");
}, 2, TimeUnit.SECONDS);

优势

  • 使用线程池,避免单线程阻塞问题。
  • 支持更灵活的调度(如 scheduleAtFixedRate)。
  • 异常不会影响其他任务。

对比

对比项 Timer ScheduledExecutorService
线程模型 单线程 线程池(可配置多个线程)
异常处理 异常会终止整个 Timer 异常不影响其他任务
灵活性 较低(仅支持简单调度) 高(支持动态调整、复杂调度)
适用场景 简单的单次或周期性任务 高并发、复杂的定时任务调度
  • 如果只是简单的定时任务(如示例中的死锁检测),Timer 足够使用。

  • 如果是生产环境的高并发任务,优先选择 ScheduledExecutorService

Condition

Condition是Java并发包(java.util.concurrent.locks)中的一个接口,它提供了线程间通信的更精细控制方式,可以替代传统的Object.wait()Object.notify()Object.notifyAll()方法。

ConditionLock配合使用,提供了更灵活的线程等待/通知机制。一个Lock可以创建多个Condition对象,允许线程在不同的条件下等待。

创建Condition

通过Lock对象的newCondition()方法创建:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

核心方法

1、await() - 使当前线程等待

类似于Object.wait(),使当前线程释放锁并等待,直到被通知或中断。

condition.await();

主要特点和作用

  1. 释放锁:调用 await() 会原子性地释放与 Condition 关联的锁
  2. 等待通知:线程进入等待状态,直到以下情况之一发生:
    • 其他线程调用该条件的 signal()signalAll() 方法
    • 线程被中断
    • 发生虚假唤醒(spurious wakeup)
  3. 重新获取锁:当线程被唤醒后,在从 await() 返回前会重新获取锁

典型使用模式:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try {
    while (条件不满足) {  // 必须用 while 而不是 if,防止虚假唤醒
        condition.await(); // 条件不满足时等待
    }
    // 执行条件满足后的操作
} finally {
    lock.unlock();
}

2、signal() - 唤醒一个等待线程

类似于Object.notify(),唤醒一个等待在该条件上的线程。

condition.signal();

3、signalAll() - 唤醒所有等待线程

类似于Object.notifyAll(),唤醒所有等待在该条件上的线程。

condition.signalAll();

4、awaitUninterruptibly() - 不可中断的等待

使当前线程等待,直到被通知,且不响应中断。

5、awaitNanos(long nanosTimeout) - 限时等待

使当前线程等待,直到被通知、中断或超时。返回剩余时间。

6、awaitUntil(Date deadline) - 绝对时间等待

使当前线程等待,直到被通知、中断或到达指定时间。

Condition vs Object监视器方法

特性 Condition Object监视器方法
前置条件 必须持有Lock 必须持有对象监视器锁
多条件支持 一个Lock可创建多个Condition 不支持
等待队列 可以有多个等待队列 只有一个等待队列
中断响应 支持可中断和不可中断等待 只支持可中断等待
超时控制 支持纳秒级超时和绝对时间等待 只支持毫秒级超时

YOLO