Java 多线程与并发(七):ReentrantLock 与 ReentrantReadWriteLock

Wesley13
• 阅读 829

ReentrantLock

我们已经通过前几章学会了 synchronized 和 AQS 等相关只是。下面我们继续来学习 ReentrantLock 这个并发工具类,如果你已经了解了 AQS 的机制,那么你学习 ReentrantLock 将会非常轻松。

背景

Synchronized 关键字虽然在 JDK 1.6 做了很多优化,但是它的底层是由 JVM 通过 CPU 指令去实现的,这就使得程序员无法对他进行扩展和优化。比如线程获取不到锁就会一直阻塞,也无法中断一个正在获取锁的线程。所以大神 Doug Lea,在 AQS 的基础上为我们构建了一个更加灵活的锁,他可以实现定时获取锁,公平/非公平锁,获取锁的过程中被中断等功能,这个就是 ReentrantLock。

代码分析

ReentrantLock 的字面意思是重入锁,synchronized 也是可重入的,所以我们可以认为 ReentrantLock 的作用就是用来替换 synchronized,实现它所不能实现的功能。

我们来通过 ReentrantLock 的 acquire 和 release 来了解一下:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();//当前线程
    int c = getState();
    if (c == 0) {//表示锁未被抢占
        if (compareAndSetState(0, acquires)) {//使用 CAS 尝试获取到同步状态,获取到同步状态就是获取到了锁。
            setExclusiveOwnerThread(current); //当前线程占有锁
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//线程已经占有锁了 重入
        int nextc = c + acquires;//同步状态记录重入的次数
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; //既然可重入 就需要释放重入获取的锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;//只有线程全部释放才返回true
        setExclusiveOwnerThread(null); //同步队列的线程都可以去获取同步状态了
    }
    setState(c); 
    return free;
}

看了这段代码我们也就理解了 ReentrantLock 如何实现加锁解锁以及重入锁的功能了。

  1. 通过 CAS 操作尝试更改 state,更改成功视为获取到锁。
  2. 如果是同一个线程再次获得锁,增加重入次数,存入到 state 中。
  3. release 操作正好和 acquire 相反。

其实如果你明白了上面这段代码,整个 ReentrantLock 你也就明白了,同理我们可以通过控制 State(AQS 提供了操作 State 的各种方法)来实现自己的锁。

上面的方法是 ReentrantLock 操作 State 的方法,在将它封装以下就可以完成锁的获取和释放方法了。

释放锁

public final boolean release(int arg) {
        if (tryRelease(arg)) {//调用子类的tryRelease 实际就是Sync的tryRelease
            Node h = head;//取同步队列的头节点
            if (h != null && h.waitStatus != 0)//同步队列头节点不为空且不是初始状态
                unparkSuccessor(h);//释放头节点 唤醒后续节点
            return true;
        }
        return false;
}

Sync 的 tryRelease 就是上一小节的重入锁释放方法,如果是同一线程,那么锁的重入次数就依次递减,直到重入次数为 0,此方法才会返回 ture,此时断开头节点唤醒后续节点去获取 AQS 的同步状态。

获取锁

非公平锁

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
    final void lock() {
        if (compareAndSetState(0, 1))//通过CAS来获取同步状态 也就是锁
            setExclusiveOwnerThread(Thread.currentThread());//获取成功线程占有锁
        else
            acquire(1);//获取失败 进入AQS同步队列排队等待 执行AQS的 acquire  方法 
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

当调用 lock 方法后,首先去尝试获取 state,这也是非公平的体现,抢到 AQS 的同步状态的未必是队列的首节点。抢不到就进入 AQS 的 acquire 方法。

公平锁

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);//严格按照AQS的同步队列要求去获取同步状态
    }

    /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();//获取当前线程
        int c = getState();
        if (c == 0) {//锁未被抢占
            if (!hasQueuedPredecessors() &&//没有前驱节点
                compareAndSetState(0, acquires)) {//CAS获取同步状态
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//锁已被抢占且线程重入
            int nextc = c + acquires;//同步状态为重入次数
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

公平锁直接调用 AQS 的 acquire 方法,acquire 中自旋调用 tryAcquire。于非公平锁相比,少了那次 CAS 直接抢锁的过程。接下来在tryAcquire去抢占锁的时候,也会先调用hasQueuedPredecessors看看前面是否有节点已经在等待获取锁了,如果存在则同步队列的前驱节点优先。

公平锁因为有了大量的线程的切换而使吞吐量有所下降。

使用

类似于 synchronized 的锁。

Lock lock = new ReentrantLock();
try{
    lock.lock();
    //...
}fianlly{
    lock.unlock();
}

带限制的锁:

public boolean tryLock()// 尝试获取锁,立即返回获取结果 轮询锁
public boolean tryLock(long timeout, TimeUnit unit)//尝试获取锁,最多等待 timeout 时长 超时锁
public void lockInterruptibly()//可中断锁,调用线程 interrupt 方法,则锁方法抛出 InterruptedException  中断锁

比较

上一篇文章提到过,AQS 里面提供了等待队列方便我们实现更细粒度的等待通知。

synchronized

ReentrantLock

使用Object本身的wait、notify、notifyAll调度机制

与Condition结合进行进行更细粒度的线程的调度

显式的使用在同步方法或者同步代码块

显式的声明指定起始和结束位置

托管给JVM执行,不会因为异常、或者未释放而发生死锁

手动释放锁

由于 JDK 1.6 对 synchronized 的优化,它现在性能也不差,所以可以优先使用 synchronized,如果需要更多功能的锁,可以使用 ReentrantLock。

ReentrantReadWriteLock

背景

ReentrantLock 归根结底还是一个互斥锁,同一时刻只能有一个线程持有这把锁。互斥虽然能够保证线程安全,避免读/读,读/写,写/写冲突,但是读/读这样问题其实根本不需要互斥,这就使得在读比较多的问题中,ReentrantLock 牺牲了一部分性能。

如果我们考虑适当的放宽这个条件,多个线程同时读不互斥,这样性能就会提升很多,因此就诞生了 ReentrantReadWriteLock。

ReentrantReadWriteLock 中的读锁是一把共享锁,基于 AQS 的共享模式实现的。

ReentrantReadWriteLock 中的写锁是一把互斥锁,基于 AQS 的独占模式实现的。

源码分析

我们直到 AQS 只提供了一个 int 类型的 state 来表示锁的状态。现在我们有两把锁,同时还要表示锁的重入状态,该怎么办那?

设计者将这个 32 位 的 int 值给切分开,高 16 位代表读状态,低 16 为代表写状态。因为写锁是互斥的,所以需要用低 16 为来表示重入次数,所以写锁的重入次数最大为 65535 个。可能会有多个线程同时获得读锁,所以无法通过高 16 为同时记录真么多个线程的重入次数,所以使用了 ThreadLocl 来做这件事情。

ThreadLocal 的典型用处,此处用来记录线程获取锁的重入次数。

先看使用 ReentrantReadWriteLock 获取读写锁的方式,会调用 readLock() 和 writeLock() 两个方法。

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

再来看 WriteLock 和 ReadLock 两个静态内部类,它们对锁的实现如下:

public static class ReadLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquireShared(1); //共享
    }

    public void unlock() {
        sync.releaseShared(1); //共享
    }
}

public static class WriteLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquire(1); //独占
    }

    public void unlock() {
        sync.release(1); //独占
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {}

WriteLock

WriteLock 与 ReentrantLock 在获取锁的时候有区别,WriteLock 不但要考虑目前是否有写锁占用,同时还要考虑是否有读锁占用。

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && //尝试获取独占锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取失败后排队
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();  //获取共享变量state
    int w = exclusiveCount(c); //获取写锁数量
    if (c != 0) { //有读锁或者写锁
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread()) //写锁为0(证明有读锁),或者持有写锁的线程不为当前线程,其实就是除了自己之外还有读锁或者写锁
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);  //当前线程持有写锁,为重入锁,增加重入锁的次数即可,因为当前线程持有锁,所以不需要 CAS。
        return true;
    }
    // 目前没有锁,直接使用 CAS 获取
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires)) //CAS操作失败,多线程情况下被抢占,获取锁失败。CAS成功则获取锁成功
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

释放:

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())//当前线程不持有写锁
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases; //重入次数减少
    boolean free = exclusiveCount(nextc) == 0; //减少到0写锁释放
    if (free)
        setExclusiveOwnerThread(null); //写锁释放
    setState(nextc);
    return free;
}

ReadLock

我们再看看使用共享模式的 ReadLock 在获取锁的方式上有什么不同。

protected final int tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current) //写锁不等于0的情况下,验证是否是当前写锁尝试获取读锁
        return -1;
    int r = sharedCount(c);  //获取读锁数量
    if (!readerShouldBlock() && //读锁不需要阻塞
        r < MAX_COUNT &&  //读锁小于最大读锁数量
        compareAndSetState(c, c + SHARED_UNIT)) { //CAS操作尝试设置获取读锁 也就是高位加1
        if (r == 0) {  //当前线程第一个并且第一次获取读锁,说明此线程是第一个获取读锁的,或者说在它前面获取读锁的都走光光了,它也算是第一个吧
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { //说明是 firstReader 重入获取读锁(这非常简单,count 加 1 结束)
            firstReaderHoldCount++;
        } else { // 当前线程不是第一个获取读锁的线程,更新 ThreadLocal
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
  1. 只要没有写锁占用,读锁就可以尝试获取锁。

  2. ThreadLocalHoldCounter 是用来记录每个读锁的重入次数的。

    static final class ThreadLocalHoldCounter extends ThreadLocal { //ThreadLocal变量 public HoldCounter initialValue() { return new HoldCounter(); } }

    static final class HoldCounter { int count = 0; //当前线程持有锁的次数 // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); //当前线程ID }

释放:

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {//当前线程是读锁的第一个线程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1) //第一次占有读锁 直接清除该线程
            firstReader = null;
        else
            firstReaderHoldCount--;//读锁的第一个线程重入次数减少
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();//读锁释放
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count; //重入次数减少
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        //减少读锁的线程数量
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

锁降级

锁降级指的是写锁降级为读锁。首先持有当前写锁,然后获取到读锁,随后就释放掉写锁。ReentrantReadWriteLock 不支持锁升级,因为如果有多个线程获取到了读锁,其中任何一个获取到了写锁,修改了数据,其他的线程感知不到更新,这样就无法保证数据的可见性。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这