AQS是JUC中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量state和一个FIFO队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态state失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋CAS来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。
AQS为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。
package com.abstractqueuesynchronizer;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class SelfAbstractQueueSynchronizer {
//继承AbstractQueuedSynchronizer类
private static class Syn extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
//是否拥有锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
//获取锁
public boolean tryAcquire(int acquires) {
if(compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放所
protected boolean tryRelease(int releases) {
if(getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
private final Syn syn = new Syn();
public void lock() {
syn.acquire(1);
}
public boolean tryLock() {
return syn.tryAcquire(1);
}
public void unlock() {
syn.release(1);
}
public boolean isLocked() {
return syn.isHeldExclusively();
}
}
测试
package com.abstractqueuesynchronizer;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Main {
private static CyclicBarrier barrier = new CyclicBarrier(31);
private static int a = 0;
private static SelfAbstractQueueSynchronizer test = new SelfAbstractQueueSynchronizer();
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
for(int i = 0; i < 30; i++) {
Thread t = new Thread(new Runnable(){
@Override
public void run() {
for(int i = 0; i < 1000; i++) {
unlockIncrement();
}
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();;
}
}
});
t.start();
}
barrier.await();
System.out.println("unlock model a= " + a);
System.out.println("##########################");
barrier.reset();
a = 0;
for(int i = 0; i < 30; i++) {
Thread t = new Thread(new Runnable(){
@Override
public void run() {
for(int i = 0; i < 1000; i++ ) {
lockIncrement();
}
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
t.start();
}
barrier.await();
System.out.println("lock model a= " + a);
}
public static void unlockIncrement() {
a++;
}
public static void lockIncrement() {
test.lock();
a++;
test.unlock();
}
}