一般用ReentrantLock
的方式如下:
// 新建非公平锁实例
Lock lock = new ReentrantLock();
// 新建非公平锁实例
// Lock lock = new ReentrantLock(true);
// 加锁
lock.lock();
try {
// 业务逻辑
} catch (Exception e) {
e.printStackTrace();
} finally {
// 归还锁
lock.unlock();
}
先看看继承关系,没啥好说的。
image.png
锁创建过程
可以看到代码中调用构造方法时加上一个true
就可以创建公平锁。
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
在大多数情况下,锁的申请都是非公平的。也就是说,线程1和线程2都需要申请锁A,那么当锁A可用的时候,哪个线程获得锁呢?一般情况都是系统随机挑选一个,因此不能保证其公平性。而公平锁就不是这样的,按照先后顺序来,谁先申请,就谁先获得锁。公平锁带来的一个问题就是需要维护一个有序队列,性能相对低。所以默认情况下,锁都是非公平的。
其中公平锁和非公平锁的类图如下:
image.png
image.png
从图中可以看出 FairSync
和 NonfairSync
的区别,就是两者重写的 lock()
和 tryAcquire()
方法。
获取锁
看下ReentrantLock#lock
public void lock() {
sync.lock(); // 调用NonfairSync#lock或者FairSync#lock
}
概述
先简单概述一下公平锁和非公平锁获取锁的过程:
- 获取锁的过程就是设置
AbstractQueuedSynchronizer#state
属性的过程。 - 公平锁因为维护了一个队列,只有当前线程位于队列头部时,才能获取锁(设置
state
的状态)。 - 非公平锁就是一个不断插队的过程,不care现在队列是什么情况,抓住一切机会设置
state
的状态,设置成功了,就代表获得了锁,几次尝试插队失败以后,才加入到队尾,等待获得锁。
源码
NonfairSync
final void lock() { // 快速尝试将state从0设置成1 // state=0代表当前没有任何一个线程获得了锁 // Unsafe#compareAndSwapT系列compareAndSwapT(Object o, long offset, T expected, T x)代表 // 如果实例o的offset属性的值为expected时,将offset属性的值设置成x // 实例o的offset对应的是某个属性,通过反射获得某个属性的offset值 if (compareAndSetState(0, 1)) // state设置成1代表获得锁成功 // 将独占锁标识设置为当前线程,在重入锁的时候需要 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); // 父类实现,AbstractQueuedSynchronizer#acquire }
// AbstractQueuedSynchronizer#acquire public final void acquire(int arg) { // tryAcquire将调用子类的实现 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
// ReentrantLock.Sync#nonfairTryAcquire // 这又是一个尝试插队的过程 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();// 获取state值 if (c == 0) {// 当前没有任何一个线程获取锁 if (compareAndSetState(0, acquires)) {// 尝试将state的值从0设置成acquires(即1) setExclusiveOwnerThread(current);// 设置成功标识独占锁 return true; } } // 虽然state!=0,但是当前获取锁的是本线程 else if (current == getExclusiveOwnerThread()) { // 锁状态数量加上acquires,即锁状态数量加1 int nextc = c + acquires; // 重入次数太多,大过Integer.MAX if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
// AbstractQueuedSynchronizer#addWaiter private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 如果pred还是尾部(即没有被其他线程更新),则将尾部更新为node节点(即当前线程快速设置成了队尾) if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速设置不成功,正常入队 enq(node); return node; }
private Node enq(final Node node) { // 在一个循环里不停的尝试将node节点插入到队尾里 for (;;) { Node t = tail; // 空队列,需要初始化 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else {// 跟快速入队的逻辑一样 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 在一个循环里不断等待前驱节点执行完毕 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {// 通过tryAcquire获得锁 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 中断 if (shouldParkAfterFailedAcquire(p, node) && // 是否需要阻塞 parkAndCheckInterrupt()) // 阻塞,返回线程是否被中断 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
// 确保当前结点的前驱结点的状态为SIGNAL,SIGNAL意味着线程释放锁后会唤醒后面阻塞的线程。毕竟,只有确保能够被唤醒,当前线程才能放心的阻塞。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL)// 前驱节点状态为SIGNAL,表明当前线程需要阻塞,因为前置节点承诺执行完之后会通知唤醒当前节点 /* * This node has already set status asking a release * to signal it, so it can safely park. / return true; if (ws > 0) {// ws > 0代表前驱节点取消了 / * Predecessor was cancelled. Skip over predecessors and * indicate retry. / do { node.prev = pred = pred.prev;// 不断的把前驱取消了的节点移除队列 } while (pred.waitStatus > 0); pred.next = node; } else {// 初始化状态,将前驱节点的状态设置成SIGNAL / * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
// 从方法名可以看出这个方法做了两件事 private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//挂起当前的线程 // 如果当前线程已经被中断了,返回true,否则返回false // 有可能在挂起阶段被中断了 return Thread.interrupted(); }
FairSync
FairSync
相对来说就简单很多,只有重写的两个方法跟NonfairSync
不同。final void lock() { acquire(1); }
/**
- Fair version of tryAcquire. Don't grant access unless
- recursive call or no waiters or is first.
*/ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() &&// 没有前驱节点了 compareAndSetState(0, acquires)) {// 而且没有锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
总结
以非公平锁来说一下获取锁的过程:
- 尝试快速获取锁(本质就是插队);
- 快速获取不成功的话,调用acquire方法获取锁
2.1. 获取锁状态量state值,如果state=0,快速获取锁(本质也是插队)
2.2. 如果锁状态不为0,但是是当前线程获取到了锁,代表可重入,锁状态值state+1
2.3. 上述两步都失败,返回false,并将线程加入到队列中阻塞 - addWaiter主要作用就是将当前线程封装成Node实例,加入到队尾中
3.1. 快速入队列尾(本质还是插队)
3.2. 快速入队列尾失败,调用enq方法,在死循环里一直尝试加入队列尾 - 通过acquireQueued方法阻塞
4.1. 不停的通过循环等待当前节点变成队列头,并再次通过tryAcquire获取锁(又是插队)
4.2 不成功的话,判断是否需要阻塞 - shouldParkAfterFailedAcquire处理是否需要阻塞,只要当前节点的前驱节点中的其中某个节点waitStatus == -1即可
- parkAndCheckInterrupt方法负责阻塞,并且在阻塞结束后判断线程是否被中断了,如果被中断了,调用selfInterrupt方法。
释放锁
源码
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 尝试快速释放
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;// 重入次数减少releases次(即1次)
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {// 完全释放锁
free = true;
setExclusiveOwnerThread(null);// 解除独占标识
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找到离head最近,而且waitStatus <= 0 的节点
// 其实在ReentrantLock中,waitStatus应该只能为0和-1,需要唤醒的都是-1(Node.SIGNAL)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);// 唤醒挂起线程
}