Java多线程之锁学习(增强版)

2023-02-26 17:02:20 学习 多线程 增强版

阻塞锁

含义:多个线程同时调用一个方法的时候,所有的线程都被排队处理了,让线程进入阻塞状态进行等待,当获得相应的信号(唤醒、时间)时,才能进入线程的准备就绪的状态。通过竞争。进入运行状态。

Java中,能够进入\退出、阻塞状态或包含阻塞锁的方法有 ,synchronized 关键字(其中的重量锁),ReentrantLock,Object.wait()\notify()

实例代码如下

// 在main方法中,开启100个线程执行 lock() 方法
// 开启10个线程执行 unlock() 方法
// 此时进行加锁和解锁之后,加锁多于解锁的时候,就会一直阻塞等待
public class Demo01 {
    private boolean isLocked = false;

    public synchronized void lock() throws InterruptedException {
        while (isLocked) {
            // 当其他线程进来,即处于等待阻塞状态   
            wait();
        }
        System.out.println("Demo01.lock");
        isLocked = true;
    }

    public synchronized void unlock() {
        isLocked = false;
        System.out.println("Demo01.unlock");
        notify();
    }

    public static void main(String[] args) {
        Demo01 demo01 = new Demo01();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    demo01.lock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                demo01.unlock();
            }).start();
        }
    }
}

由于被调用的方法越耗时,线程越多的时候,等待的线程等待的时间也就越长,甚至于几分钟或者几十分钟。对于WEB等对反应时间要求很高的系统来说,这是不可行的,因此需要让其非阻塞,可以在没有拿到锁之后马上返回,告诉客户稍后重试。

非阻塞锁

含义:多个线程同时调用一个方法的时候,当某一个线程最先获取到锁,这时其他线程没拿到锁,这时就直接返回,只有当最先获取的锁的线程释放锁,其他线程的锁才能进来,在它释放之前其他线程都会获取失败

代码实现如下

// 非阻塞锁
public class Demo02 {
    private boolean isLocked = false;

    public synchronized boolean lock() throws InterruptedException {
        if (isLocked) {
            return false;
        }
        System.out.println("Demo01.lock");
        isLocked = true;
        return true;
    }

    public synchronized boolean unlock() {
        if (isLocked) {
            System.out.println("Demo01.unlock");
            isLocked = !isLocked;
            return true;
        }
        return false;
    }

    public static void main(String[] args) {
        Demo02 demo01 = new Demo02();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    if (demo01.lock()) {
                        System.out.println("获取锁失败");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                if (demo01.unlock()) {
                    System.out.println("解锁成功");
                }
            }).start();
        }
    }
}

锁的四种状态

锁的状态总共有四种:无锁状态、偏向锁、轻量级锁和重量级锁。随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁(但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级)jdk 1.6中默认是开启偏向锁和轻量级锁的,

注意:HotSpot JVM是支持锁降级的 但是因为降级的效率太低了,所以在开发中不使用降级的操作

但是锁的状态时存在哪里的呢?

锁存在Java的对象头中的Mark Work。Mark Work默认不仅存放着锁标志位,还存放对象hashCode等信息。运行时,会根据锁的状态,修改Mark Work的存储内容。如果对象是数组类型,则虚拟机用3个字宽存储对象头,如果对象是非数组类型,则用2字宽存储对象头。在32位虚拟机中,一字宽等于四字节,即32bit。

字宽(Word): 内存大小的单位概念, 对于 32 位处理器 1 Word = 4 Bytes, 64 位处理器 1 Word = 8 Bytes

每一个 Java 对象都至少占用 2 个字宽的内存(数组类型占用3个字宽)。

  • 第一个字宽也被称为对象头Mark Word。 对象头包含了多种不同的信息, 其中就包含对象锁相关的信息。
  • 第二个字宽是指向定义该对象类信息(class metadata)的指针

无锁状态

在代码刚刚进入同步块的时候,就处于无锁状态。

偏向锁

概念:偏向锁会偏向于第一个访问锁的线程,如果在接下来的运行过程中,该锁没有被其他的线程访问,则持有偏向锁的线程将永远不需要触发同步。也就是说,偏向锁在资源无竞争的情况下消除了同步语句,连CAS操作都不做了,提高了程序的运行性能

引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量锁执行路径。因为轻量级锁的获取以及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次原子指令(由于一旦出现多线程竞争的情况就必须撤销偏向锁,所以偏向锁的撤销操作的性能损耗必须小于节省下来的CAS原子指令的性能消耗)上面说过,轻量级锁是为了在多线程交替执行同步块时提高性能,而偏向锁则是在只有一个线程执行同步块的时候进一步提高性能。

轻量级锁

“轻量级”是相对于使用操作系统互斥量来实现传统锁而言的。但是首先需要强调一点的是,轻量级锁并不是用来代替重量级锁的,它的本意是在没有多线程竞争的前提下,减少传统的重量级锁使用产生的性能消耗。

在解释轻量级锁的执行过程过程之前,我们要先明白一点,轻量级锁使用的场景是线程交替同步块的情况,如果存在同一时间访问同一锁的情况,就会导致轻量级锁膨胀为重量级。

重量级锁

synchronized是通过对象内部的一个监视器锁(monitor)实现的。但是monitor底层又依赖于底层操作系统的Mutex Lock实现的。而操作系统实现线程之间的切换就需要从用户态切换到核心态,切换的成本很高,状态之间的转化需要相对比较长的时间,这就是synchronized效率低的原因,因此,这种依赖于操作系统的Mutex Lock所实现的锁被称之为“重量级锁”

可重入锁

可重入锁也叫做递归锁,指的是同一线程外层函数获得锁之后,内存递归函数仍然有获得该锁的代码,但是不受影响

Java中ReentrantLock和synchronized都是可重入锁 自旋锁不是可重入锁

可重入锁的最大作用就是避免死锁

下面是一段实例代码

public class Test implements Runnable {

    public synchronized void get() {
        System.out.println(Thread.currentThread().getId());
        set();
    }

    public synchronized void set() {
        System.out.println(Thread.currentThread().getId());
    }

    @Override
    public void run() {
        get();
    }

    public static void main(String[] args) {
        Test ss = new Test();
        new Thread(ss).start();
        new Thread(ss).start();
        new Thread(ss).start();
    }
}
// 执行结果
// 11
// 11
// 12
// 12
// 13
// 13
public class Test2 implements Runnable {
    ReentrantLock lock = new ReentrantLock();

    public void get() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getId());
            set();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void set() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getId());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void run() {
        get();
    }

    public static void main(String[] args) {
        Test ss = new Test();
        new Thread(ss).start();
        new Thread(ss).start();
        new Thread(ss).start();
    }
}
// 执行结果
// 11
// 11
// 12
// 12
// 13
// 13

自旋锁

public class SpinLock {
    private AtomicReference<Thread> owner = new AtomicReference<>();

    public void lock() {
        Thread current = Thread.currentThread();
        while (!owner.compareAndSet(null, current)) {
        }
    }

    public void unlock() {
        Thread current = Thread.currentThread();
        owner.compareAndSet(current, null);
    }

改进的自旋锁

public class SpinLock1 {
    private AtomicReference<Thread> owner =new AtomicReference<>();
    private int count =0;
    public void lock(){
        Thread current = Thread.currentThread();
        if(current==owner.get()) {
            count++;
            return ;
        }
        while(!owner.compareAndSet(null, current)){
        }
    }
    public void unlock (){
        Thread current = Thread.currentThread();
        if(current==owner.get()){
            if(count!=0){
                count--;
            }else{
                owner.compareAndSet(current, null);
            }
        }
    }
}

读写锁

Lock接口以及其对象,使用它,很优雅的控制了竞争资源的安全访问,但是这种锁不区别读写 - 为普通锁

为了提高性能,Java提供了读写锁,在读的地方使用读锁,在写的时候使用写锁,灵活控制,如果没有写锁的情况下。读是无阻塞的,在一定情况下提高了程序的执行效率。下面我们来看源代码(Lock接口和Condition接口在上一篇Java - 多线程 - 锁和提升 第1篇已经分析过,此处不再分析)

public class ReentrantReadWriteLock
    implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;

    // 读锁内部类对象
    private final ReentrantReadWriteLock.ReadLock readerLock;

    // 写锁内部类对象
    private final ReentrantReadWriteLock.WriteLock writerLock;

    // 执行所有的同步机制
    final Sync sync;

    // 无参构造函数 调用有参构造 ReentrantReadWriteLock(boolean fair)
    public ReentrantReadWriteLock() {
        this(false);
    }

    // 有参构造函数 是否创建公平锁
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    //  返回读锁和写锁的对象
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

    // 继承自 AbstractQueueSynchronizer 的Sync
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        // 共享移位
        static final int SHARED_SHIFT   = 16;
        // 共享单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 最大数量
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 独占
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        // 返回共享锁的数量 
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        // 返回独占锁的数量
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

        // 每线程读取保持计数的计数器
        static final class HoldCounter {
            int count = 0;
            // 使用id 而不是使用引用 避免垃圾残留
            final long tid = getThreadId(Thread.currentThread());
        }

        // 线程本地子类
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        // 当前线程持有的可重入锁的数量,只在构造函数和readObject中初始化,
        // 当线程的读取保持计数下降到0的时候删除
        private transient ThreadLocalHoldCounter readHolds;

        // 获取readLock的最后一个线程的保持计数
        private transient HoldCounter cachedHoldCounter;

        // 第一个获得读的线程
        private transient Thread firstReader = null;
        // 计数器
        private transient int firstReaderHoldCount;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }

        // 当前线程获取锁的时候由于策略超过其他等待线程而应阻止,返回true
        abstract boolean readerShouldBlock();

        // 如果由于试图超越其他等待线程的策略而导致当前线程在尝试获取写锁(且有资格这样做)
        // 时应阻塞,则返回true
        abstract boolean writerShouldBlock();

        // 尝试释放
        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

        // 尝试增加
        protected final boolean tryAcquire(int acquires) {
            
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

        // 尝试释放共享锁
        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

        // 非法的监视器状态异常
        private IllegalMonitorStateException unmatchedUnlockException() {
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        }

        // 尝试加共享锁
        protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

        // 读取的完整版本,可处理CAS丢失和tryAcquireShared中未处理的可重入读取
        final int fullTryAcquireShared(Thread current) {

            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;

                } else if (readerShouldBlock()) {

                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

        // 执行tryLock进行写入,从而在两种模式下都可以进行插入,这与tryAcquire的作用相同
        // 只是缺少对 writerShouldBlock的调用
        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

        // 执行tryLock进行读取,从而在两种模式下都可以进行插入。 
        // 除了没有调用readerReaderShouldBlock以外,这与tryAcquireShared的作用相同。
        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

        // 是否独占锁
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // 返回Condition对象
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // 返回独占锁
        final Thread getOwner() {
            return ((exclusiveCount(getState()) == 0) ?
                    null :
                    getExclusiveOwnerThread());
        }

        // 获取readLock所得个数
        final int getReadLockCount() {
            return sharedCount(getState());
        }

        // 判断是否是 writeLock 锁
        final boolean isWriteLocked() {
            return exclusiveCount(getState()) != 0;
        }

        // 获得写锁的数量
        final int getWriteHoldCount() {
            return isHeldExclusively() ? exclusiveCount(getState()) : 0;
        }

        // 获得读锁的数量
        final int getReadHoldCount() {
            if (getReadLockCount() == 0)
                return 0;

            Thread current = Thread.currentThread();
            if (firstReader == current)
                return firstReaderHoldCount;

            HoldCounter rh = cachedHoldCounter;
            if (rh != null && rh.tid == getThreadId(current))
                return rh.count;

            int count = readHolds.get().count;
            if (count == 0) readHolds.remove();
            return count;
        }

        // 序列化
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            readHolds = new ThreadLocalHoldCounter();
            setState(0); // reset to unlocked state
        }

        // 获取数量
        final int getCount() { return getState(); }
    }

    // 非公平锁
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    // 公平锁
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

    // 读锁实现了 Lock 接口
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;
        private final Sync sync;

        // 初始化,获得是否是公平读锁
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        // 加锁 是共享锁
        public void lock() {
            sync.acquireShared(1);
        }

        // 锁中断
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

        // 调用加锁
        public boolean tryLock() {
            return sync.tryReadLock();
        }

        // 超时加锁
        public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

        // 解锁
        public void unlock() {
            sync.releaseShared(1);
        }

        // Condition 对象
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }

        // toString方法
        public String toString() {
            int r = sync.getReadLockCount();
            return super.toString() +
                "[Read locks = " + r + "]";
        }
    }

    // 写锁
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;
        private final Sync sync;

        // 写锁初始化
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        // 加锁
        public void lock() {
            sync.acquire(1);
        }

        // 锁中断
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }

        // 加锁
        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }

        // 超时加锁
        public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }

        // 解锁,调用 release 减1
        public void unlock() {
            sync.release(1);
        }

        // Condition对象
        public Condition newCondition() {
            return sync.newCondition();
        }

        // toString方法
        public String toString() {
            Thread o = sync.getOwner();
            return super.toString() + ((o == null) ?
                                       "[Unlocked]" :
                                       "[Locked by thread " + o.getName() + "]");
        }

        // 是否有独占的线程
        public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }

        // 获取写锁的数量
        public int getHoldCount() {
            return sync.getWriteHoldCount();
        }
    }

    // 判断是否是公平锁
    public final boolean isFair() {
        return sync instanceof FairSync;
    }

    // 判断是否是独占锁
    protected Thread getOwner() {
        return sync.getOwner();
    }

    // 获取读锁的个数
    public int getReadLockCount() {
        return sync.getReadLockCount();
    }

    // 判断是否是写锁
    public boolean isWriteLocked() {
        return sync.isWriteLocked();
    }

    // 判断锁是否被当前线程持有
    public boolean isWriteLockedByCurrentThread() {
        return sync.isHeldExclusively();
    }

    // 查询当前线程对该锁的可重入写入次数
    public int getWriteHoldCount() {
        return sync.getWriteHoldCount();
    }

    // 查询当前线程对该锁的可重读次数
    public int getReadHoldCount() {
        return sync.getReadHoldCount();
    }

    // 返回写线程的集合
    protected Collection<Thread> getQueuedWriterThreads() {
        return sync.getExclusiveQueuedThreads();
    }

    // 返回读线程的集合
    protected Collection<Thread> getQueuedReaderThreads() {
        return sync.getSharedQueuedThreads();
    }

    // 查询是否有任何线程正在等待获取读或写锁
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    //  查询指定线程是否在等待读或者写锁
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }

    // 获取等待队列的长度
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    // 获得等待队列中的线程集合
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    // 查询是否有任何线程正在等待与写锁关联的给定条件
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    // 查询是否有任何线程正在等待与写锁关联的给定条件的长度
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    // 获得给定条件等待线程的集合
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    // toString()方法
    public String toString() {
        int c = sync.getCount();
        int w = Sync.exclusiveCount(c);
        int r = Sync.sharedCount(c);

        return super.toString() +
            "[Write locks = " + w + ", Read locks = " + r + "]";
    }

    // 获取线程ID
    static final long getThreadId(Thread thread) {
        return UNSAFE.getLongVolatile(thread, TID_OFFSET);
    }

    // Unsafe对象
    private static final sun.misc.Unsafe UNSAFE;
    private static final long TID_OFFSET;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            TID_OFFSET = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("tid"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

总结

  • 从读写锁的实现代码,我们可以看到,其本质还是使用了AQS的独占锁和共享锁实现了读写分离。
  • 当需要进行读的时候使用共享锁,当需要写的时候使用独占锁
  • 同时,ReentrantReadWriteLock也提供了默认的非公平机制,当然也可以使用构造方法设置是否是公平锁

互斥锁

互斥锁指的是一次最多只能有一个线程持有的锁。如Java的Lock

互斥锁也是为了保护共享资源的同步,在任何时刻,最多只能有一个保持者,也就说,在任何时刻最多只能有一个执行单元获得锁。互斥锁和自旋锁在调度机制上略有不同。对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁。

悲观锁

悲观锁,顾名思义就是狠悲观,认为每次拿去的数据都会被修改,所以在每次拿锁的时候都会上锁,这样别人想拿到这个数据就会block到直到他拿到锁。传统的数据库就使用了很多的这种的机制:如行锁、表锁、读锁、写锁等,都是在做操作之前上锁。共享锁、排他锁、独占锁是悲观锁的一种实现。

Java中的悲观锁,最典型的就是synchronized。而AQS框架下的锁,先尝试使用CAS乐观锁去获取锁,获取不到才会转为悲观锁,如ReentrantLock

乐观锁

乐观锁,顾名思义就是很乐观,每次拿去的数据都认为不会被修改,所以不会上锁。但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,对于此过程出现的ABA问题,可以使用版本号进行控制。

乐观锁用于多读的场景,数据库提供的类似于write_condition机制都是使用了乐观锁,使用CAS保证这个操作的原子性

乐观锁和悲观锁的比较

  • 乐观锁不是数据库自己实现的,悲观锁是数据库自己实现的
  • 两种锁各有优缺点,不能认为一种好于另外一种,乐观锁适用于写少的场景,也就是冲突发生很少的情况,这样省了锁的开销,加大了系统的吞吐量。
  • 但是如果经常发生冲突,上次应用不会retry,此时为了保证安全和维持性能,应该使用悲观锁

公平锁

公平锁。就是字面意思,公平的,是非抢占式的,一个一个排好队,等待执行,但是有缺点。如果某个线程执行的时间过长,会导致阻塞。比如ReentrantLock中的内部类 FairSync和ReentrantReadWriteLock中的内部类FairSync都是公平锁

非公平锁

非公平锁,及时字面以自,抢占式的,不管谁先来,谁后来,抢到了就是我的。比如ReentrantLock中的内部类 NonfairSync和ReentrantReadWriteLock中的内部类NonfairSync都是非公平锁

显示锁和内置锁

显示锁,是人为手动的锁,如:ReentrantLock、Lock锁,也就是说,实现了Lock的锁都是显示锁

内置锁:内置锁使用synchronized,内置锁是互斥锁

Java中每个对象都可以用作一个实现同步的锁。 线程进入同步代码块或方法的时候会自动获得该锁,在退出同步代码块或方法时会释放该锁。获得内置锁的唯一途径就是进入这个锁的保护的同步代码块或方法。

轮询锁和定时锁

由tryLock实现,与无条件获取锁模式相比,它们具有更完善的错误恢复机制。可避免死锁的发生

boolean tryLock():仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值 true。如果锁不可用,则此方法将立即返回值 false。

tryLock的重载 tryLock(time,TimeUnit)就是定时锁

对象锁和类锁

Java的对象锁和类锁,其实也就是 Java - 多线程 - 锁和提升 第1篇开篇所说的8锁 8锁核心思想

  • 关键字在实例方法上,锁为当前实例
  • 关键字在静态方法上,锁为当前Class对象
  • 关键字在代码块上,锁为括号里面的对象

1.对象锁和类锁在基本概念上和内置锁是一致的,但是,两个锁是有很大的区别,对象锁适用于对象的实例方法,或者一个对象实例上的,类锁是作用于类的静态方法或者一个类的class对象上的。

2.类的实例可以有多个,但是每个类只有一个class对象,不同实例的对象锁是互不相干的,但是每个类只有一个类锁。

3.其实类锁只是一个概念上的东西,并不是真实存在的,它只是用来帮我们理解锁定实例方法和静态方法的区别。

4.synchronized只是一个内置的加锁机制,当某个方法加上synchronized关键字的后,就表明要获得该内置锁才能执行,并不能阻止其他线程访问不需要获得该锁的方法。

5.调用对象的wait()方法的时候,会释放持有的对象锁,以便于调用 notify() 方法使用。notify()调用之后,会等到notify()所在的线程执行完毕之后再释放锁。

锁粗化

就是将多次连接在一起的加锁、解锁操作合并为一次,将多个连续的锁拓展为一个更大的锁。

通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽可能短,但是大某些情况下,一个程序对同一个锁不间断、高频地请求、同步与释放,会消耗掉一定的系统资源,因为锁的讲求、同步与释放本身会带来性能损耗,这样高频的锁请求就反而不利于系统性能的优化了,虽然单次同步操作的时间可能很短。锁粗化就是告诉我们任何事情都有个度,有些情况下我们反而希望把很多次锁的请求合并成一个请求,以降低短时间内大量锁请求、同步、释放带来的性能损耗。

锁消除

锁消除即:删除不必要的加锁操作。根据代码逃逸技术,如果判断到一段代码中,堆上的数据不会逃逸出当前线程。那么可以认定这段代码是线程安全的,不必要加锁。

锁消除是发生在编译器级别的一种锁优化方式。有时候我们写的代码完全不需要加锁,却执行了加锁操作。

比如,StringBuffer类的append操作:

@Override
public synchronized StringBuffer append(String str) {
    toStrinGCache = null;
    super.append(str);
    return this;
}

源码中可以看出,append方法用了synchronized关键词,它是线程安全的。但我们可能仅在线程内部把StringBuffer当作局部变量使用:

public class Demo {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        int size = 10000;
        for (int i = 0; i < size; i++) {
            createStringBuffer("ic", "java");
        }
        long timeCost = System.currentTimeMillis() - start;
        System.out.println("createStringBuffer:" + timeCost + " ms");
    }
    public static String createStringBuffer(String str1, String str2) {
        StringBuffer sBuf = new StringBuffer();
        // append方法是同步操作
        sBuf.append(str1);
        sBuf.append(str2);
        return sBuf.toString();
    }
}

代码中createStringBuffer方法中的局部对象sBuf,就只在该方法内的作用域有效,不同线程同时调用createStringBuffer()方法时,都会创建不同的sBuf对象,因此此时的append操作若是使用同步操作,就是白白浪费的系统资源。

这时我们可以通过编译器将其优化,将锁消除,前提是java必须运行在server模式(server模式会比client模式作更多的优化),同时必须开启逃逸分析:

-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks

其中+DoEscapeAnalysis表示开启逃逸分析,+EliminateLocks表示锁消除。

逃逸分析:比如上面的代码,它要看sBuf是否可能逃出它的作用域?如果将sBuf作为方法的返回值进行返回,那么它在方法外部可能被当作一个全局对象使用,就有可能发生线程安全问题,这时就可以说sBuf这个对象发生逃逸了,因而不应将append操作的锁消除,但我们上面的代码没有发生锁逃逸,锁消除就可以带来一定的性能提升。

信号量

信号量有一个线程同步工具:Semaphore

下面我们来分析一下源码

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;

    // Sync锁
    private final Sync sync;

    // Sync实现了AbstractQueueSynchronizer
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        //  初始化版本初始值
        Sync(int permits) {
            setState(permits);
        }

        // 获取状态
        final int getPermits() {
            return getState();
        }

        // 不公平尝试共享
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        // 尝试释放共享
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        // 减少状态的指定值
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    // 非公平锁
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    // 公平锁
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    // 构造方法 设置初始版本号
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    // 构造方法
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    // 增加1
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 不间断地获取
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    // 从此信号量获取许可,直到获得一个许可为止,将一直阻塞
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    // 仅在调用时可用时,才从此信号量获取许可
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    // 定时设置
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 释放
    public void release() {
        sync.releaseShared(1);
    }

    // 设置指定的个数
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    // 获得许可证的给定数目从这个信号,阻塞直到所有可用。
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    // 获得许可证的给定数目从此信号量,只有在所有可在调用的时候。
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    // 收购从此信号量许可证的给定数量,如果所有给定的等待时间内变得可用,并且当前线程未被中断 
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    // 释放
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    // 返回现在的剩余值
    public int availablePermits() {
        return sync.getPermits();
    }

    // 返回减少之后的值
    public int drainPermits() {
        return sync.drainPermits();
    }

    // 减少指定个数的值
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    // 判断是否时公平锁
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    // 判断队列是否有线程 
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    // 返回线程的长度
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    // 返回队列线程的集合
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    // toString() 方法
    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}

总结:

我们发现其本质还是 AbstractQueueSynchronizer 中的共享模式和独占模式

此类也有公平和非公平的实现

独享锁

独享锁,也叫独占锁,意思是锁A只能被一个锁拥有,如synchronized,

ReentrantLock是独享锁,他是基于AQS实现的,在ReentrantLock源码中, 使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁 。 而当c等于0的时候说明当前没有线程占有锁,它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,所以AQS可以确保对state的操作是安全的。

// 它默认是非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}

// 创建ReentrantLock,公平锁or非公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

// 而他会分别调用lock方法和unlock方法来释放锁
public void lock() {
    sync.lock();
}

public void unlock() {
    sync.release(1);
}

但是其实他不仅仅是会调用lock和unlock方法,因为我们的线程不可能一点问题没有,如果说进入到了waiting状态,在这个时候如果没有unpark()方法,就没有办法来唤醒他, 所以,也就接踵而至出现了tryLock(),tryLock(long,TimeUnit)来做一些尝试加锁或者说是超时来满足某些特定的场景的需求了

ReentrantLock会保证method-body在同一时间只有一个线程在执行这段代码,或者说,同一时刻只有一个线程的lock方法会返回。其余线程会被挂起,直到获取锁。

从这里我们就能看出,其实ReentrantLock实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。而在源码中通过AQS来获取独享锁是通过调用acquire方法,其实这个方法是阻塞的

共享锁

从我们之前的独享所就能看得出来,独享锁是使用的一个状态来进行锁标记的,共享锁其实也差不多,但是JAVA中有不想定力两个状态,所以区别出现了, 他们的锁状态时不一样的。

基本的流程是一样的,主要区别在于判断锁获取的条件上,由于是共享锁,也就允许多个线程同时获取,所以同步状态的数量同时的大于1的,如果同步状态为非0,则线程就可以获取锁,只有当同步状态为0时,才说明共享数量的锁已经被全部获取,其余线程只能等待。

最典型的就是ReentrantReadWriteLock里的读锁,它的读锁是可以被共享的,但是它的写锁确每次只能被独占。

总结

  • 独享锁:同时只能有一个线程获得锁。
  • 共享锁:可以有多个线程同时获得锁。

分段锁

在JDK1.7之前,HashMap的底层是数组+链表。同样的,ConcurrentHashMap的底层树结构是数组+链表,但是和HashMap不一样的是,ConcurrentHashMap的中存放数据是一段一段的。即由很多个Segment(段)组成的,每个Segment中都有着类似于数组+链表的结构

关于Segment

1.ConcurrentHashMap由三个参数

  • initalCapacity:初始化总容量,默认值为16
  • loadFactor:加载因子,默认0.75
  • concurrentLevel:并发级别,默认16

2.其中的并发级别控制了Segment的个数。在y一个ConcurrentHashMap创建后Segment的个数是不能变的,扩容过程是改变每个Segment的大小

关于分段锁

Segment继承了重入锁ReentrantLock,有了锁的的功能。每个锁控制的是一段,当每个Segment越来越大的时候,锁的粒度就越来越大了

  • 分段锁的优势是保证造操作不同段map的时候进行锁的竞争和等待。这相当于直接对整个map同步synchronized只是有优势的
  • 缺点在于分成很多段的时候会浪费很多的内存空间(不连续,碎片化),操作map的时候竞争一个分段锁概率狠小的时候,分段锁反而会造成更新等操作的长时间等待,分段锁的性能会下降

JDK1.8的map实现

JDK中的HashMap和ConcurrentHashMap。底层数据结构为数组+链表+红黑树。数组可以扩容,链表可以转化为红黑树(本篇文章不对红黑树做讲解,之前已经分析过)

新版的ConcurrentHashMap为什么不使用ReentrantLock而使用synchronized?

减少内存开销:如果使用ReenteantLock则需要节点继承AQS来获得同步支持,增加内存开销,而1.8中只有头节点需要同步

内部优化:synchronized是JVM直接支持的,JVM能在运行时做出相应的优化措施:锁粗化、锁消除、锁自旋等

锁的粒度

首先锁的粒度并没有变粗,甚至变得更细了。每当扩容一次,ConcurrentHashMap的并发度就扩大一倍。

Hash冲突

JDK1.7中,ConcurrentHashMap从过二次hash的方式(Segment -> HashEntry)能够快速的找到查找的元素。在1.8中通过链表加红黑树的形式弥补了put、get时的性能差距。

扩容

JDK1.8中,在ConcurrentHashmap进行扩容时,其他线程可以通过检测数组中的节点决定是否对这条链表(红黑树)进行扩容,减小了扩容的粒度,提高了扩容的效率。

死锁案例和排查

public class DeadLockDemo {
    public static void main(String[] args) {
        String lockA = "lockA";
        String lockB = "lockB";
        MyThread myThread1 = new MyThread(lockA,lockB);
        MyThread myThread2 = new MyThread(lockB,lockA);
        new Thread(myThread1).start();
        new Thread(myThread2).start();
    }

}

class MyThread implements Runnable {

    private String lockA;
    private String lockB;

    public MyThread(String lockA, String lockB) {
        this.lockA = lockA;
        this.lockB = lockB;
    }

    @Override
    public void run() {
        synchronized (lockA) {
            System.out.println(Thread.currentThread().getName() + "lock:" + lockA + " => " + lockB);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lockB) {
                System.out.println(Thread.currentThread().getName() + "lock:" + lockB + " => " + lockA);
            }
        }
    }
}

到此这篇关于Java多线程之锁学习(增强版)的文章就介绍到这了,更多相关Java多线程 锁内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

相关文章