一:多线程实现方式
ThreadPoolExecutor线程池的类图
1.创建线程:
实现Runnable接口
@FunctionalInterface public interface Runnable { void run(); }
关于@FunctionalInterface注解
Java 8为函数式接口引入了一个新注解@FunctionalInterface,主要用于编译级错误检查,加上该注解,当你写的接口不符合函数式接口定义的时候,编译器会报错。
继承Thread:
public class Thread implements Runnable { //方法基本都是用native 修饰 ... }
2.线程执行有返回结果:
public interface ExecutorService extends Executor {
这里先说下参数Callable和Runnable
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
@FunctionalInterface public interface Runnable { void run(); }
相同点:
1、两者都是接口
2、两者都需要调用Thread.start启动线程
不同点:
1、callable的核心是call方法,允许返回值,runnable的核心是run方法,没有返回值
2、call方法可以抛出异常,但是run方法不行
3、callable和runnable都可以应用于executors。而thread类只支持runnable
返回值Future:
网上查找的关系图,不是类图:
public interface Future<V> { //用来取消任务,如果取消成功返回true,取消失败返回false,参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务 boolean cancel(boolean mayInterruptIfRunning); //判断是否被取消了 boolean isCancelled(); boolean isDone(); //获取线程执行返回结果 V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
现在先简单了解一下,这个是策略模式,
RunnableFuture
这个接口同时继承Future接口和Runnable接口,在成功执行run()方法后,可以通过Future访问执行结果。这个接口都实现类是FutureTask,一个可取消的异步计算,这个类提供了Future的基本实现,用这个类实现启动和取消一个计算,查询这个计算是否已完成,恢复计算结果。计算的结果只能在计算已经完成的情况下恢复。如果计算没有完成,get方法会阻塞,一旦计算完成,这个计算将不能被重启和取消,除非调用runAndReset方法。
FutureTask能用来包装一个Callable或Runnable对象,因为它实现了Runnable接口,而且它能被传递到Executor进行执行。为了提供单例类,这个类在创建自定义的工作类时提供了protected构造函数。
SchedualFuture
这个接口表示一个延时的行为可以被取消。通常一个安排好的future是定时任务SchedualedExecutorService的结果
CompleteFuture
一个Future类是显示的完成,而且能被用作一个完成等级,通过它的完成触发支持的依赖函数和行为。当两个或多个线程要执行完成或取消操作时,只有一个能够成功。
ForkJoinTask
基于任务的抽象类,可以通过Fo价rkJoinPool来执行。一个ForkJoinTask是类似于线程实体,但是相对于线程实体是轻量级的。大量的任务和子任务会被ForkJoinPool池中的真实线程挂起来,以某些使用限制为代。
demo:
@Test public void testCallable() throws ExecutionException, InterruptedException { //之后会详细分析 ExecutorService executor= Executors.newFixedThreadPool(2); //创建线程对象 Callable callable=new Callable() { @Override public Object call() throws Exception { Thread.sleep(3000); System.out.println("call方法执行了"); return "call方法返回值"; } }; Future future=executor.submit(callable); System.out.println("提交任务之后,获取结果之前 "+getStringDate()); System.out.println("获取返回值"+future.get()); System.out.println("提交任务之后,获取结果之后 "+getStringDate()); } public String getStringDate() { Date currentTime = new Date(); SimpleDateFormat formatter = new SimpleDateFormat("HH:mm:ss"); String dateString = formatter.format(currentTime); return dateString; }
输出:
提交任务之后,获取结果之前 23:20:32
call方法执行了
获取返回值call方法返回值
提交任务之后,获取结果之后 23:20:35
思考:
1.什么时候使用多线程:
当某段程序块耗时比较长,引起程序阻塞的时候,就可以考虑使用多线程优化
2.使用callable结合futureTask获取执行结果demo,模拟调用外部接口,单线程需要13s,多线程只要10s
@Test public void testCallable() throws InterruptedException {
//之后会详细分析线程池
ExecutorService executor= Executors.newFixedThreadPool(2);
//创建线程对象
Callable callable=new Callable() {
@Override
public Object call() throws Exception {
System.out.println("调用其他项目接口,假设耗时10s");
Thread.sleep(10000);
return "返回结果";
}
};
FutureTask
结果:
调用其他项目接口,假设耗时10s
执行其他业务逻辑
获取返回值返回结果
任务执行完毕
3.Future模式: 核心思想是异步调用。
打个比方:做早饭,我要吃面包和牛奶。烤面包需要2分钟,热牛奶需要1分钟。我先去烤面,然后途中去热牛奶,总时间花费2分钟。
4.FutureTask.get() 阻塞原理:
是通过LockSupport.park() 和 unpark() 实现。
LockSupport.park() 和 unpark() 功能类似于Thread.wait()和notify()但是LockSupport 更加灵活。
LockSupport理解:
https://www.jianshu.com/p/e3afe8ab8364
LockSupport很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继续执行;如果许可已经被占用,当前线程阻塞,等待获取许可
LockSupport优势:
(1)notify()必须在wait()后面,但是unpark()可以在park()前面。
(2)unpark可以唤醒指定的线程
这里还可以再深挖,但是因为这博客是为了准备面试,之后再来研究吧
2.线程池:
Executors:线程池ThreadPoolExecutor的工厂模式,有几个默认的线程池的模板。
(1)线程池中的几种重要的参数及流程说明。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue
corePoolSize,线程池里最小线程数
maximumPoolSize,线程池里最大线程数量,超过最大线程时候会使用RejectedExecutionHandler
keepAliveTime,unit,线程最大的存活时间
workerQueue,缓存异步任务的队列
threadFactory,用来构造线程池里的worker线程
RejectedExecutionHandler:拒绝策略
(2)接下来先分析队列的实现,工厂的实现,拒绝策略的实现
a.队列:
- _直接提交。_工作队列的默认选项是 SynchronousQueue,在newCachedThreadPool中使用,newCachedThreadPool的核心线程数是0,每次提交都会直接创建一个线程,所以它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
- _无界队列。_使用无界队列LinkedBlockingQueue,在newFixedThreadPool和newSingleThreadPool中使用,将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
- _有界队列。_当使用有限的 maximumPoolSizes 时,有界队列ArrayBlockingQueue,有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
b工程类接口-ThreadFactory :给外界提供一个口子,可以在创建线程前后做一些事情或者设置属性。
c.拒绝策略:
线程池的拒绝策略 Abort策略:默认策略,新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常可由调用者捕获。
CallerRuns策略:调节机制,既不抛弃任务也不抛出异常,而是将某些任务回退到调用者。不会在线程池的线程中执行新的任务,而是在调用exector的线程中运行新的任务。 由调用线程(提交任务的线程)处理该任务
Discard策略:新提交的任务被抛弃。
DiscardOldest策略: 丢弃队列最前面的任务,然后重新提交被拒绝的任务 (不适合工作队列为优先队列场景)
所有 BlockingQueue
都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
- 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
- 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
- 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
被拒绝的任务当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 execute(java.lang.Runnable)
中提交的新任务将被_拒绝_。在以上两种情况下,execute 方法都将调用其RejectedExecutionHandler
的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
方法。下面提供了四种预定义的处理程序策略:
- 在默认的
ThreadPoolExecutor.AbortPolicy
中,处理程序遭到拒绝将抛出运行时RejectedExecutionException
。 - 在
ThreadPoolExecutor.CallerRunsPolicy
中,线程调用运行该任务的execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。 - 在
ThreadPoolExecutor.DiscardPolicy
中,不能执行的任务将被删除。 - 在
ThreadPoolExecutor.DiscardOldestPolicy
中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
定义和使用其他种类的 RejectedExecutionHandler
类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。
线程池种类:
- newSingleThreadExecutor: 创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO,优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
public LinkedBlockingQueue() { //队列长度是Integer.MAX_VALUE this(Integer.MAX_VALUE); }
2.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中
FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
3. newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
//最大线程池是Integer.MAX_VALUE
return new ThreadPoolExecutor(0, Integer.MAX_VALUE****,
60L, TimeUnit.SECONDS,
new SynchronousQueue
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
这种类型的线程池特点是:
(1)工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程
(2)将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
(3)在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
4.newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
线程池工作原理
1、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
2、当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会执行拒绝策略
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
这样的过程说明,并不是先加入任务就一定会先执行。假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时,执行的顺序就是这样的:首先执行任务 1、2、3,然后任务 413 被放入队列。这时候队列满了,任务 14、15、16 会被马上执行,而任务 1720 则会抛出异常。最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。
3、线程池有什么作用?
(1)降低资源消耗,重复利用已创建的线程,降低创建线程和销毁线程的消耗
(2) 提高响应速度,任务到达是不需要等待线程创建就能立即执行
(3)提高线程可管理性,通过线程池的统一分配可以调优和jiankon
4、线程池都有哪几种工作队列?
3种,直接提交-SynchronousQueue,有界队列-ArrayBlockingQueue,无界队列-LinkedBlockingQueue
5、怎么理解无界队列和有界队列?
6.队列的相关知识?
7.阿里巴巴是不推荐使用Executors的工厂方式创建线程池的,原因是上面的也提到的,模板里面要么是最大线程数是Integer.MAX,要么就是队列的长度是Integer.MAX,这再内存低的时候很可能会出现oom。
8.重温下GC,oom,栈溢出等等内存相关的知识。
线程状态切换
Thread. Interrupt()方法真正作用不是为了中断线程,当线程正在运行的时候,调用interrupt()并不会中断线程,线程会照常运行,但是当线程处于阻塞状态的时候,会抛出异常 InterruptedException。此方法主要是为了发出一个中断信号,给正在运行的线程发送信号,然后正在运行的线程执行操作。
Lock 借类图看下java的锁机制
Lock接口:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long var1, TimeUnit var3) throws InterruptedException; void unlock(); // 与Lock配合可以实现等待/通知模式,相当于 synchronized配合Object的wait()、notify()系列方法可以实现等待/通知模式 Condition newCondition(); }
Demo:
@Test public void testCondition(){ Lock lock=new ReentrantLock(); Condition condition=lock.newCondition(); ExecutorService executorService= Executors.newFixedThreadPool(2); executorService.submit(new Runnable() { @Override public void run() { lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"拿到锁了"); System.out.println(Thread.currentThread().getName()+"等待信号"); condition.await(); System.out.println(Thread.currentThread().getName()+"拿到信号"); }catch (Exception e){
}finally {
lock.unlock();
} } }); executorService.submit(new Runnable() { @Override public void run() { lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"拿到锁了"); System.out.println(Thread.currentThread().getName()+"发出信号"); condition.signal(); }catch (Exception e){
}finally {
lock.unlock();
} } }); }
结果:
pool-1-thread-1拿到锁了
pool-1-thread-1等待信号
pool-1-thread-2拿到锁了
pool-1-thread-2发出信号
pool-1-thread-1拿到信号
public class ReentrantLock implements Lock, java.io.Serializable { //同步器 private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { } static final class NonfairSync extends Sync { } static final class FairSync extends Sync { }
//默认是非公平锁 public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } }
由上面的ReentrantLock我们可以看到, Sync继承了AQS ,然后又有FairSync和NonfairSync继承Sync.。接下来引发的思考:
1.什么是AQS-AbstractQueuedSynchronizer
AQS实现了一个FIFO的队列。底层实现的数据结构是一个双向链表。 AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
AQS维护的其实是内部的state的状态,state表示获取锁的线程数,state=0说明资源空闲,state=1 说明有现成获取了锁,state>1说明等待的进程数。
使用的是尾插法,如果是公平锁,直接插入到尾部,如果是非公平锁,会先去跟头节点竞争一次,如果竞争成功直接获取锁。
线程入队列/出队列过程,以公平锁的tryAcquire方法为例子:
//ture-加锁成功 false-加锁失败 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取state的状态值 int c = getState(); //如果当前state的状态是空闲-0的 if (c == 0) { //判断是否有等待队列 if (!hasQueuedPredecessors() && //用cas修改state的状态 compareAndSetState(0, acquires)) { //设置获取锁的线程为当前线程 setExclusiveOwnerThread(current); return true; } } //判断当前持有锁的线程是不是当前线程-这就是重入锁的概念 else if (current == getExclusiveOwnerThread()) { //加acquires,一般都是1 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); //加锁成功 return true; } return false; }
从上面我们可以看出
PS:补充可重入锁的概念: 可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁(前提得是同一个对象或者class),这样的锁就叫做可重入锁。就像上面的代码一样,如果当前线程就是持有锁的线程,那他还是可以加锁成功。
非公平锁加锁代码
final void lock() { //进来就去抢一下锁,如果抢到直接让当前线程持有锁,这个时间段是属于前一个线程释放锁,但是唤醒队列里面等待的线程需要一定的时间,这时候进来新的线程就可以抢占到锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //没抢到再去获取锁 acquire(1); }
PS:为什么默认使用非公平锁?
在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。
假设线程A持有一个锁,并且线程B请求这个锁。由于锁被A持有,因此B将被挂起。当A释放锁时,B将被唤醒,因此B会再次尝试获取这个锁。与此同时,如果线程C也请求这个锁,那么C很可能会在B被完全唤醒之前获得、使用以及释放这个锁。这样就是一种双赢的局面:B获得锁的时刻并没有推迟,C更早的获得了锁,并且吞吐量也提高了。
当持有锁的时间相对较长或者请求锁的平均时间间隔较长,应该使用公平锁。在这些情况下,插队带来的吞吐量提升(当锁处于可用状态时,线程却还处于被唤醒的过程中)可能不会出现。
java 锁机制 : https://blog.csdn.net/u010648018/article/details/79750608
读写锁
读锁就是共享锁:
public static class ReadLock implements Lock, java.io.Serializable { 。。。 }
public void lock() { //这一段就是面向接口编程,也可以理解为策略模式的体现 sync.acquireShared(1); }
sync 是ReentrantReadWriteLock的内部类,继承了aqs,子类有公平锁FairSync和非公平锁NonfairSync
abstract static class Sync extends AbstractQueuedSynchronizer {}
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); //没有独占锁且不是当前线程持有独占锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //共享锁的个数 int r = sharedCount(c); //readerShouldBlock见下面2. if (!readerShouldBlock() && r < MAX_COUNT && //cas更新值成功就获取读锁,读锁(高16位)+1 compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //自旋重复上面的操作 return fullTryAcquireShared(current); }
首先先补充一个概念:
1.state原本是32位int类型整数,将其划分为高16位和低16位。 高16位是读锁,低16位是写锁。这样就能理解下面的两段判断读写锁个数的代码了
EXCLUSIVE_MASK=16
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//得到的就是1111111111111111
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//就是c&1111111111111111,没锁就是0 state等于几这个结果就返回几
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
2.readerShouldBlock这里有两种实现:
(1)公平锁:只要有等待的队列,就返回true,否则返回false
final boolean readerShouldBlock() { return hasQueuedPredecessors(); }
(2)非公平锁:
final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); }
final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
这只是一种启发式地避免写锁无限等待的做法,它在遇到同步队列的head后继为写锁节点时,会让readerShouldBlock返回true代表新来的读锁(new reader)需要阻塞等待这个head后继。但只是一定概率下能起到作用,如果同步队列的head后继是一个读锁,之后才是写锁的话,readerShouldBlock就肯定会返回false了 。
写锁就是独占锁:
public void lock() { sync.acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && ---1 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ---2 selfInterrupt(); }
----1 处tryAcquire-尝试加锁,有公平锁的实现和非公平锁的实现,
公平锁的实现:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //没有线程持有锁 if (c == 0) { //没有等待队列 if (!hasQueuedPredecessors() && //尝试替换状态成功 compareAndSetState(0, acquires)) { //当前线程持有锁 setExclusiveOwnerThread(current); return true; } } //判断当前线程是都持有锁的线程-重入锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
非公平锁的实现:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
从上面我们可以看出,公平锁和非公平锁的尝试加锁方法,就相差一个hasQueuedPredecessors()的判断条件。
---2 acquireQueued 入栈出栈
private Node addWaiter(Node mode) { //将当前线程初始化为node节点,mode 是独占模式/共享模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //尾插法 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取node的前节点,头结点就是持有锁的线程,之后的节点会一直自旋,等到cas置换成功之后,他就持有锁了 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
https://www.jianshu.com/p/bc85d6c51b70 总结。
独占锁与共享锁区别
1)独占功能:当锁被头节点获取后,只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程。
2)共享功能:只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒AQS队列中的下一个节点的线程,
每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能