在java并发包java.util.concurrent中,除了重入锁ReentrantLock外,读写锁ReentrantReadWriteLock也很常用。在实际开发场景中,在使用共享资源时,可能读操作远远多于写操作。这种情况下,如果对这部分共享资源能够让多个线程读的时候不受阻塞,仅仅在写的时候保证安全性,这样效率会得到显著提升。读写锁ReentrantReadWriteLock便适用于这种场景。
再描述一下进入读锁和写锁的条件。
进入读锁:
1.没有其他线程的写锁
2.有写请求且请求线程就是持有锁的线程
进入写锁:
1.没有其他线程读锁
2.没有其他线程写锁
本篇从源码方面,简要分析ReentrantReadWriteLock的实现原理,以及展示一下它的使用效果。
源码
这是ReentrantReadWriteLock维护的一对锁
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
ReentrantReadWriteLock的构造器中,同时实例化读写锁,同时与ReentrantLock相同,也有公平锁和非公平锁之分
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
写锁
获取锁
public void lock() {
sync.acquire(1);
}
//这里与ReentrantLock相同
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
这里解析tryAcquire()方法。
- 获取当前线程
- 获取状态
- 获取写线程数
- 若state不为0,表示锁已被持有。再判断,如果写线程数为0,则读锁被占用,返回false;如果写线程数不为0,且独占线程不是当前线程,表示写锁被其他线程占用没返回false
- 如果写锁重入数大于最大值MAX_COUNT,抛错
- 写锁重入,返回true
- state为0,根据公平锁还是非公平锁判断是否阻塞线程。不需要阻塞就CAS更新state
- 当前线程设为独占线程,获取写锁,返回true
释放锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
分析tryRelease()方法
- 判断持有写锁的线程是否当前线程,不是则抛错
- state减1
- 以新state计算写锁数量,如果为0,表示完全释放;
- 完全释放就设置独占线程为null
- 如果独占线程数量不是0,还是更新state,这里就表示多次重入写锁后,释放了一次
读锁
获取锁
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
这里分析tryAcquireShared()方法
- 获取当前线程
- 获取state
- 如果写锁数量不为0,且独占线程不是本线程,获得读锁失败。因为写锁被其他线程占用
- 获取读锁数量
- 根据公平锁或者非公平锁判断是否应该被阻塞,判断读锁数量是否小于最大值MAX_COUNT,再尝试CAS更新state
- 以上判断都通过且更新state也成功后,如果读锁为0,记录第一个读线程和此线程占用读锁数量
- 如果第一个读线程是本线程,表示此时是读锁的重入,则把此线程占用读锁数量+1
- 如果读锁数量不为0,且此线程也不是第一个读线程,则找到当前线程的计数器,并计数+1
- 如果在阻塞判断,读锁数量判断和CAS更新是否成功这部分没有通过,则进入fullTryAcquireShared()方法,逻辑与上面的获取类似,以无限循环方式保证操作成功,不赘述。
释放锁
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
分析tryReleaseShared()方法
- 获取当前线程
- 如果当前线程是第一个读线程,则释放firstReader或者第一个读线程的锁计数-1
- 不是就获得当前线程的计数器。根据计数选择删除此计数器或者减少计数
- 无限循环更新state
获取锁和释放锁的源码部分代码就分析放到这里,接下来用代码时间看看ReentrantReadWriteLock的使用效果测试。
public class ReadWriteLockTest {
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static ExecutorService executorService = Executors.newCachedThreadPool();
//读操作
public static void read(){
try { //加读锁
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + " is reading " + System.currentTimeMillis());
Thread.sleep(1000);
} catch (InterruptedException e){
}finally {
readWriteLock.readLock().unlock();
}
}
//写操作
public static void write() {
try { //加写锁
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + " is writing "+ System.currentTimeMillis());
Thread.sleep(1000);
} catch (InterruptedException e){
}finally {
readWriteLock.writeLock().unlock();
}
}
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
ReadWriteLockTest.read();
}
});
}
for (int i = 0; i < 3; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
ReadWriteLockTest.write();
}
});
}
}
}
执行结果如下:
pool-1-thread-2 is reading 1549002279198
pool-1-thread-1 is reading 1549002279198
pool-1-thread-3 is reading 1549002279198
pool-1-thread-4 is writing 1549002280208
pool-1-thread-5 is writing 1549002281214
pool-1-thread-6 is writing 1549002282224
可以看到,thread1,2,3在读时,是同时执行。thread4,5,6在写操作是,都差不多间隔1000毫秒。