AQS共享锁应用之Semaphore原理

Stella981
• 阅读 788

我们调用Semaphore方法时,其实是在间接调用其内部类或AQS方法执行的。Semaphore类结构与ReetrantLock类相似,内部类Sync继承自AQS,然后其子类FairSync和NoFairSync分别实现公平锁和非公平锁的获取锁方法tryAcquireShared(int arg),而释放锁的tryReleaseShared(int arg)方法则有Sync类实现,因为非公平或公平锁的释放过程都是相同的。

AQS通过state值来控制对共享资源访问的线程数,有线程请求同步状态成功state值减1,若超过共享资源数量获取同步状态失败,则将线程封装共享模式的Node结点加入到同步队列等待。有线程执行完任务释放同步状态后,state值会增加1,同步队列中的线程才有机会获得执行权。公平锁与非公平锁不同在于公平锁申请获取同步状态前都会先判断同步队列中释放存在Node,若有则将当前线程封装成Node结点入队,从而保证按FIFO的方式获取同步状态,而非公平锁则可以直接通过竞争获取线程执行权。

//Semaphore的acquire()
public void acquire() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
  }

/**
*  注意Sync类继承自AQS
*  AQS的acquireSharedInterruptibly()方法
*/ 
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //判断是否中断请求
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果tryAcquireShared(arg)不小于0,则线程获取同步状态成功
    if (tryAcquireShared(arg) < 0)
        //未获取成功加入同步队列等待
        doAcquireSharedInterruptibly(arg);
}
//Semaphore中非公平锁NonfairSync的tryAcquireShared()
protected int tryAcquireShared(int acquires) {
    //调用了父类Sync中的实现方法
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
         //使用死循环
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             //判断信号量是否已小于0或者CAS执行是否成功
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
     //创建共享模式的结点Node.SHARED,并加入同步队列
   final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
         //进入自旋操作
         for (;;) {
             final Node p = node.predecessor();
             //判断前驱结点是否为head
             if (p == head) {
                 //尝试获取同步状态
                 int r = tryAcquireShared(arg);
                 //如果r>0 说明获取同步状态成功
                 if (r >= 0) {
                     //将当前线程结点设置为头结点并传播               
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     failed = false;
                     return;
                 }
             }
           //调整同步队列中node结点的状态并判断是否应该被挂起
           //并判断是否需要被中断,如果中断直接抛出异常,当前结点请求也就结束
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 throw new InterruptedException();
         }
     } finally {
         if (failed)
             //结束该结点线程的请求
             cancelAcquire(node);
     }
    }

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前结点的等待状态
        int ws = pred.waitStatus;
        //如果为等待唤醒(SIGNAL)状态则返回true
        if (ws == Node.SIGNAL)
            return true;
        //如果ws>0 则说明是结束状态,
        //遍历前驱结点直到找到没有结束状态的结点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果ws小于0又不是SIGNAL状态,
            //则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

private final boolean parkAndCheckInterrupt() {
        //将当前线程挂起
        LockSupport.park(this);
        //获取线程中断状态,interrupted()是判断当前中断状态,
        //并非中断线程,因此可能true也可能false,并返回
        return Thread.interrupted();
}

//不可中的acquireShared()
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //没有抛出异常中的。。。。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//设置为头结点
        /* 
         * 尝试去唤醒队列中的下一个节点,如果满足如下条件: 
         * 调用者明确表示"传递"(propagate > 0), 
         * 或者h.waitStatus为PROPAGATE(被上一个操作设置) 
         * 并且 
         *   下一个节点处于共享模式或者为null。 
         * 
         * 这两项检查中的保守主义可能会导致不必要的唤醒,但只有在有
         * 有在多个线程争取获得/释放同步状态时才会发生,所以大多
         * 数情况下会立马获得需要的信号
         */  
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            //唤醒后继节点,因为是共享模式,所以允许多个线程同时获取同步状态
                doReleaseShared();
        }
    }

//Semaphore的release()
public void release() {
       sync.releaseShared(1);
}

//调用到AQS中的releaseShared(int arg) 
public final boolean releaseShared(int arg) {
       //调用子类Semaphore实现的tryReleaseShared方法尝试释放同步状态
      if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
      }
      return false;
  }

//在Semaphore的内部类Sync中实现的
protected final boolean tryReleaseShared(int releases) {
       for (;;) {
              //获取当前state
             int current = getState();
             //释放状态state增加releases
             int next = current + releases;
             if (next < current) // overflow
                 throw new Error("Maximum permit count exceeded");
              //通过CAS更新state的值
             if (compareAndSetState(current, next))
                 return true;
         }
        }

private void doReleaseShared() {
    /* 
     * 保证释放动作(向同步等待队列尾部)传递,即使没有其他正在进行的  
     * 请求或释放动作。如果头节点的后继节点需要唤醒,那么执行唤醒  
     * 动作;如果不需要,将头结点的等待状态设置为PROPAGATE保证   
     * 唤醒传递。另外,为了防止过程中有新节点进入(队列),这里必  
     * 需做循环,所以,和其他unparkSuccessor方法使用方式不一样  
     * 的是,如果(头结点)等待状态设置失败,重新检测。 
     */  
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            // 获取头节点对应的线程的状态
            int ws = h.waitStatus;
            // 如果头节点对应的线程是SIGNAL状态,则意味着头
            //结点的后继结点所对应的线程需要被unpark唤醒。
            if (ws == Node.SIGNAL) {
                // 修改头结点对应的线程状态设置为0。失败的话,则继续循环。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒头结点h的后继结点所对应的线程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果头结点发生变化,则继续循环。否则,退出循环。
        if (h == head)                   // loop if head changed
            break;
    }
}


//唤醒传入结点的后继结点对应的线程
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
       //拿到后继结点
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      if (s != null)
          //唤醒该线程
          LockSupport.unpark(s.thread);
    }

 剖析基于并发AQS的共享锁的实现(基于信号量Semaphore)

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
灯灯灯灯 灯灯灯灯
3年前
面试百度和美团,竟然问我多线程安全问题,正好撞在我知识点上
解决多线程安全问题无非两个方法synchronized和lock具体原理以及如何获取锁AQS算法本篇文章主要讲了lock的原理就是AQS算法,还有个姊妹篇讲解synchronized的实现原理也是阿里经常问的,一定要看后面的文章,先说结论:非公平锁tryAcquire的流程是:检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查
Wesley13 Wesley13
3年前
java中的锁
记录一下公平锁,非公平锁,可重入锁(递归锁),读写锁,自旋锁的概念,以及一些和锁有关的java类。公平锁与非公平锁:公平锁就是在多线程环境下,每个线程在获取锁时,先查看这个锁维护的队列,如果队列为空或者自身就是等待队列的第一个,就占有锁。否则就加入到等待队列中,按照FIFO的顺序依次占有锁。非公平锁会一上来就试图占
Wesley13 Wesley13
3年前
java并发相关(四)——关于synchronized的可重入性,线程切换实现原理与是否公平锁
一、可重入性  关于synchronized的可重入性的证明,我们可以通过A类内写两个同步方法syncA(),syncB()。然后syncA内调用syncB,调用syncA发现代码可正常执行,来证明这一点。  当处于无锁阶段时,划掉,都重入了不可能处于无锁。  当处于偏向锁阶段时,由之前对偏向锁的解释可知,偏向当前线程id是,当前线程可直
Wesley13 Wesley13
3年前
java 面试知识点笔记(十二)多线程与并发
问:synchronized和ReentrantLock的区别?ReentrantLock(可重入锁)位于java.util.concurrent.locks包(著名的juc包是由Douglea大神写的AQS抽象类框架衍生出来的应用)和CountDownLatch、FutureTask、Semaphore一样基于AQS实现
Wesley13 Wesley13
3年前
Java并发包小结
1、Lock  Lock功能对应关键字synchrozied功能,lock和unlock方法用于加锁和释放锁。等待锁的线程加入到等待链表中,同时阻塞线程,锁释放时,从等待链表中取出等待的线程执行,取等待的线程分公平与非公平两种方式,公平方式取第一个等待的线程,非公平方式当前正在获取锁的线程可能立刻执行,而不用加入到等待队列中,排队执行。2、Con
Wesley13 Wesley13
3年前
AQS之工作原理
前面一章LZ简单的介绍了下AbstractQueuedSynchronizer(AQS)以及AQS中提供的一些模板方法和作用,这一章LZ将用一个简单的实例来介绍下AQS中独占锁的工作原理。独占锁顾名思义就是在同一时刻只能有一个线程能获取到锁,而其它需要获取这把锁的线程将进入到同步队列中等待获取到了锁的线程释放这把锁,只有获取锁的线程释放了锁,同步队列中的线程
Wesley13 Wesley13
3年前
Java锁机制浅析之 AQS
  一、内部原理  类继承结构  Lockpackage相关API继承结构,忽略掉了一些类,以便观察其特点:  ReentrantLock和ReentrantReadWriteLock都是借助内部类Sync来实现Lock接口。ReentrantReadWriteLock没有直接实现Lock接口而是内置了读锁ReadLock和写锁Write