Java 多线程并发ReentrantLock

2022-11-13 10:11:34 java 多线程 并发

背景

在 Java 中实现线程安全的传统方式是 synchronized 关键字,虽然它提供了一定的同步能力,但它在使用上是严格的互斥同步实现:一个线程只能获取一次锁,没有给其他线程提供等待队列等机制,以至于当一个锁被释放后,任意线程都有可能获取到锁,没有线程等待的优先级顺序,会导致重要的线程在没有争用到锁的情况下,长时间阻塞。为了解决 synchronized 的痛点,Java 提供了 ReentrantLock 可重入锁来提供更丰富的能力和灵活性。

ReentrantLock

ReentrantLock 是一种可重入互斥锁,其基本能力与使用 synchronized 关键字相同,但拓展了一些功能。它实现了 Lock 接口,在访问共享资源时提供了同步的方法。操作共享资源的代码被加锁和解锁方法的调用之间,从而确保当前线程在调用加锁方法后,阻止其他线程试图访问共享资源。

可重入特性

ReentrantLock 由上次成功锁定的但尚未解锁的线程持有;当锁不被任何线程拥有时,调用 lock 方法的线程将获取到这个 ReentrantLock,如果当前线程已经拥有 ReentrantLock ,lock 方法会立即返回。

ReentrantLock 允许线程多次进入资源锁。当线程第一次进入锁时,保持计数设置为 1。在解锁之前,线程可以再次重新进入锁定状态,并且每次保持计数加一。对于每个解锁请求,保持计数减一,当保持计数为 0 时,资源被解锁。

公平锁设置参数

ReentrantLock 的构造器接收一个可选的 fairness 参数(Boolean 类型)。当设置为 true 时,在线程争用时,锁优先授予等待时间最长的线程访问。否则,此锁不保证任何特定的顺序。但是请注意,锁的公平性不能保证线程调度的公平性。

可重入锁还提供了一个公平参数,通过该参数,锁将遵循锁请求的顺序,即在线程解锁资源后,锁将转到等待时间最长的线程。这种公平模式是通过将 true 传递给锁的构造函数来设置的。

源码分析

Lock 接口

ReentrantLock 实现了 Lock 接口,所以分析源码先从 Lock 接口开始:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();​
    Condition newCondition();
}

Lock 接口定义了更灵活和更广泛的锁定操作。synchronized 关键字是 JVM 底层提供了 monitor 指令的形式加锁,这导致了获取多个锁时,需要按获取顺序的倒序解锁。Lock 就是为了解决这种不够灵活的问题而出现的。Lock 接口的实现通过允许在不同范围内获取和释放锁以及允许多个锁按任意顺序的获取和释放。随着这种灵活性的增加,额外的职责也就随之而来,synchronized 关键字以代码块的结构加锁,执行完成锁会自动释放,而 Lock 的实现则需要手动释放锁,大多数情况下,

应该使用下面的语句实现:

 Lock l = ...;
 l.lock();
 try {
   // access the resource protected by this lock
 } finally {
   l.unlock();
 }

当锁定和解锁发生在不同的作用域时,必须注意确保所有在持有锁时执行的代码都受到 try-finally 或 try-catch 的保护,以确保在必要时释放锁。

Lock 接口中定义的方法可以划分为三部分:

  • 加锁操作
  • 解锁操作
  • newCondition

加锁操作

加锁操作提供了四个方法:

    // 获取锁,如果锁不可用,则当前线程将被禁用以用于线程调度目的并处于休眠状态,直到获取到锁为止。
    void lock();​
    void lockInterruptibly() throws InterruptedException;
   boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

lock():获取锁,如果无法获取到,则当前线程进入阻塞状态,直到获取到锁为止。

lockInterruptibly():除非当前线程被中断,否则去获取锁。如果获取到了锁,则立即返回。如果没有争用到锁,则当前线程阻塞,直到发生下面两种情况之一:

如果当前线程:

以上两种情况都会抛出 InterruptedException ,并清除当前线程的中断状态。

  • 当前线程获取到了锁
  • 其他线程中断了当前线程
  • 在进入此方法时,设置了中断状态
  • 在获取锁的过程中被中断

tryLock()

仅当锁处于空闲状态时,才获取锁。获取到锁立即返回 true,如果锁被其他线程持有,则此方法立即返回 false 。

此方法的典型用法是:

 Lock lock = ...;
 if (lock.tryLock()) {
   try {
     // manipulate protected state
   } finally {
     lock.unlock();
   }
 } else {
   // perfORM alternative actions
 }

这种用法确保锁在获得时解锁,并且在未获得锁时不尝试解锁。

tryLock(long time, TimeUnit unit)

  • 如果在给定时间内锁处于空闲状态,且当前线程没有被中断,则获取锁。
  • 如果当前线程成功获取到了锁,则此方法立即返回 true ;如果当前线程无法获取到锁,则当前线程会进入阻塞状态直到发生下面三种情况之一:
  • 如果进入此方法时当前线程处于中断状态或获取锁的过程中已进入中断状态,以上两种情况都会抛出 InterruptedException ,并清除当前线程的中断状态。
  • 此外,如果 time 参数小于等于 0 ,该方法不会等待。
    • 锁被当前线程成功获取
    • 指定时间超时
    • 其他线程中断了当前线程

解锁操作:

解锁操作只提供了 unlock() 方法。

newCondition:

返回绑定到此 Lock 的 Condition 实例。

内部类

ReentrantLock 有三个内部类,分别是 Sync、NonfairSync、FairSync 。

它们的继承关系是:

Sync

这个类是 AQS 的直接实现,它为公平锁实现 FairSync 和非公平锁实现 NonfairSync 提供了共同的基础能力。

abstract static class Sync extends AbstractQueuedSynchronizer {
    @ReservedStackAccess
    final boolean tryLock()
    abstract boolean initialTryLock();
    @ReservedStackAccess
    final void lock()
    @ReservedStackAccess
    final void lockInterruptibly()
    @ReservedStackAccess
    final boolean tryLockNanos(long nanos)
    @ReservedStackAccess
    protected final boolean tryRelease(int releases)

    protected final boolean isHeldExclusively()
    final ConditionObject newCondition()
    final Thread getOwner()
    final int getHoldCount()
    final boolean isLocked()
}

下面是一些重点的方法讲解。

tryLock

这个方法执行了一个不公平的尝试加锁操作:

    @ReservedStackAccess
    final boolean tryLock() {
        Thread current = Thread.currentThread();    // 获取当前线程
        int c = getState();                         // 从 AQS 中获取状态
        if (c == 0) {                               // 当前锁的状态为未被持有
            if (compareAndSetState(0, 1)) {         // CAS 更新状态为加锁状态 1
                setExclusiveOwnerThread(current);   // 设置当前持有的线程
                return true;                        // 获取锁成功,return true
            }
        } else if (getExclusiveOwnerThread() == current) {  // 如果当前持有锁的线程是当前线程
            if (++c < 0) // overflow                        // c 即是状态也是计数器,可重入计数 + 1
                throw new Error("Maximum lock count exceeded");
            setState(c);                                    // 更新状态
            return true;                                    // 重入成功,return true
        }
        return false;                                       // 尝试获取锁失败。
    }

为什么说它是不公平的,因为这个方法没有按照公平等待原则,让等待时间最久的线程优先获取锁资源。

initialTryLock

这是一个抽象方法,用来在 lock 前执行初始化工作。

lock

    @ReservedStackAccess
    final void lock() {
        if (!initialTryLock())
            acquire(1);
    }

先根据 initialTryLock() 进行判断,然后调用 acquire(1) ,acquire 方法在 AQS 中:

    public final void acquire(int arg) {
        if (!tryAcquire(arg))
            acquire(null, arg, false, false, false, 0L);
    }

这个方法会让当前线程去尝试获取锁资源,并忽略中断。通过调用 tryAcquire 至少一次来实现,如果失败,则去等待队列排队,可能会导致阻塞。

lockInterruptibly

    @ReservedStackAccess
    final void lockInterruptibly() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!initialTryLock())
            acquireInterruptibly(1);
    }

这个方法相当于在 lock 方法前首先进行了线程中断检查,如果没有被中断,也是通过 initialTryLock() 判断是否需要执行尝试获取锁的操作。与 lock 方法不同,这里调用的是 (1)

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted() || (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
        throw new InterruptedException();
}

对线程中断进行了检查,如果线程被中断则中止当前操作,至少调用 1 次 tryAcquire 尝试去获取锁资源。否则线程去队列排队,此方法可能会导致阻塞,直到调用 tryAcquire 成功或线程被中断。

tryLockNanos

        final boolean tryLockNanos(long nanos) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return initialTryLock() || tryAcquireNanos(1, nanos);
        }
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        if (!Thread.interrupted()) {
            if (tryAcquire(arg))
                return true;
            if (nanosTimeout <= 0L)
                return false;
            int stat = acquire(null, arg, false, true, true,
                               System.nanoTime() + nanosTimeout); // 多了一个超时时间
            if (stat > 0)
                return true;
            if (stat == 0)
                return false;
        }
        throw new InterruptedException();
    }

本质上调用 acquire ,多设置了一个 time 参数。

tryRelease

        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (getExclusiveOwnerThread() != Thread.currentThread())
                throw new IllegalMonitorStateException();
            boolean free = (c == 0); // c = 0 说明成功释放锁资源
            if (free)
                setExclusiveOwnerThread(null);
            setState(c);
            return free;
        }

可以看出,tryRelease 方法最终更新了 State ,进一步说明了 AQS 的实现,本质上都是通过原子 int 来表示同步状态的。

newCondition

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

这里的 newCondition 返回的是 AQS 的内部类 ConditionObject 的实例。

Sync 中的方法与其含义:

NonfairSync 非公平锁

    static final class NonfairSync extends Sync {
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            if (compareAndSetState(0, 1)) { // 比较并设置状态成功,状态0表示锁没有被占用
                setExclusiveOwnerThread(current); // 设置当前线程为持有锁的线程
                return true;
            } else if (getExclusiveOwnerThread() == current) { // 重入情况
                int c = getState() + 1;
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            } else
                return false;
        }
​
        protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }

NonfairSync 实现了 initialTryLock() ,其中主要是为当前对象设置持有线程;如果是重入的情况,则 state 计数 + 1 。这个方法中的逻辑和 tryLock 方法十分相似,他们都是不公平的。每次尝试获取锁,都不是按照公平等待的原则,让等待时间最久的线程获得锁,所以这是不公平锁。

FairSync

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 锁处于可用状态
                if (!hasQueuedThreads() && compareAndSetState(0, 1)) { // 查询是否有线程正在等待获取此锁
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
            return false;
        }
        
        protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && !hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }

公平锁依赖两个判断条件实现:

  • hasQueuedThreads 用来查询是否有其他线程正在等待获取此锁。
  • hasQueuedPredecessors 是用来查询是否有其他线程比当前线程等待的时间更长。

当存在其他线程等待时间更久时,当前线程的 tryAcquire 会直接返回 false 。

构造函数

ReentrantLock 有两个构造函数:

    public ReentrantLock() {
        sync = new NonfairSync();
    }​
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

其中一个带有 boolean 参数的构造方法,用来根据参数 fair 实现公平锁或非公平锁,无参构造方法默认实现是非公平锁。

核心属性和方法

private final Sync sync;

从构造方法中就可以看出,ReentrantLock 的 sync 属性,代表了锁的策略(公平 or 非公平)。

sync 是一个 Sync 类型的对象,继承自 AQS ,ReentrantLock 对外暴露的方法,内部实际上就是调用 Sync 对应的方法实现的:

public class ReentrantLock implements Lock, java.io.Serializable {
    // ...
    public void lock() {
        sync.lock();
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.lockInterruptibly();
    }
    
    public boolean tryLock() {
        return sync.tryLock();
    }
    
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryLockNanos(unit.toNanos(timeout));
    }
    
    public void unlock() {
        sync.release(1);
    }
    
    public Condition newCondition() {
        return sync.newCondition();
    }
    
    public int getHoldCount() {
        return sync.getHoldCount();
    }
    
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    
    public boolean isLocked() {
        return sync.isLocked();
    }
    
    public final boolean isFair() {
        return sync instanceof FairSync;
    }
    
    protected Thread getOwner() {
        return sync.getOwner();
    }
    // ... 
}

ReentrantLock 看起来就像是 Sync 的代理类,当调用 ReentrantLock 对外暴露的方法时,会根据 sync 对象的不同的类型调用不同的实现 。

比如,下图就是一个公平锁的调用过程:

ReentrantLock.lock -> 
FairSync.lock -> 
AQS.acquire -> 
FairSync.tryAcquire -> 
AQS.hasQueuedPredecessors -> AQS.setExclusiveOwnerThread

总结

ReentrantLock 实现了 Lock 接口,有三个内部类,其中 Sync 继承自 AQS ,而后两者继承自 Sync ,它们都继承了 AQS 的能力。本质上来说 ReentrantLock 的底层原理就是 AQS 。

在 Sync 的两个子类 FairSync 和 NonfairSync 分别是公平锁策略和非公平锁策略的实现,它们通过实现initialTryLock()方法中不同的逻辑(公平锁多了一个检查是否有其他等待线程的条件)。然后实现了不同的 tryAcquire(int acquires) ,从而在线程尝试获取锁时,执行不同的策略。

到此这篇关于Java 多线程并发ReentrantLock的文章就介绍到这了,更多相关Java ReentrantLock内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

相关文章