Java - J.U.C体系进阶
作者:Kerwin
说到做到,就是我的忍道!
juc-sync 同步器框架
同步器名称
作用
CountDownLatch
倒数计数器,构造时设定计数值,当计数值归零后,所有阻塞线程恢复执行;其内部实现了AQS框架
CyclicBarrier
循环栅栏,构造时设定等待线程数,当所有线程都到达栅栏后,栅栏放行;其内部通过ReentrantLock和Condition实现同步
Semaphore
信号量,类似于“令牌”,用于控制共享资源的访问数量;其内部实现了AQS框架
Exchanger
交换器,类似于双向栅栏,用于线程之间的配对和数据交换;其内部根据并发情况有“单槽交换”和“多槽交换”之分
Phaser
多阶段栅栏,相当于CyclicBarrier的升级版,可用于分阶段任务的并发控制执行;其内部比较复杂,支持树形结构,以减少并发带来的竞争
CountDownLatch
注意:CountDownLatch和CyclicBarrier非常相似,且CyclicBarrier是可以重用的,根据具体的场景不同,代码结构不同,其实两者之间可以相互转化,详见CyclicBarrier模块,下文是CountDownLatch-Demo
// 用法比较简单,直接上代码即可
// 1.CountDownLatch的同一对象传递
// 2.构造参数的默认值需要指定
// 3.线程完成的countDown()->会使默认值减一
// 4.主线程awiw()等待,所有线程都countDown之后,主线程执行
// 应用场景:比如五个子线程文件输出导出数据,主线程等所有子线程都完成之后开始压缩操作,上传文件
public class TestCountDownLatch {
/*** * 关键点:面向对象的方式->参数传递,把CountDownLatch进行传递,使其共用同一个参数 * @param args */
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executorService = Executors.newCachedThreadPool();
MyWoker m1 = new MyWoker("work1", latch);
MyWoker m2 = new MyWoker("work2", latch);
MyWoker m3 = new MyWoker("work3", latch);
MyWoker m4 = new MyWoker("work4", latch);
MyWoker m5 = new MyWoker("work5", latch);
Boss boss = new Boss("boos", latch);
executorService.submit(m1);
executorService.submit(m2);
executorService.submit(m3);
executorService.submit(m4);
executorService.submit(m5);
executorService.submit(boss);
executorService.shutdown();
}
}
class MyWoker implements Callable<String> {
private String name;
private CountDownLatch latch;
public MyWoker (String name, CountDownLatch latch) {
this.name = name;
this.latch = latch;
}
@Override
public String call() throws Exception {
System.out.println(name + " 工人开始工作");
int time = (int)(Math.random() * 100) * 50;
Thread.sleep(time);
System.out.println(name + " 工人已经完成任务!");
latch.countDown();
return "successful";
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
}
class Boss implements Callable<String> {
private String name;
private CountDownLatch latch;
public Boss (String name, CountDownLatch latch) {
this.name = name;
this.latch = latch;
}
@Override
public String call() throws Exception {
System.out.println("老板准备就绪,等工人都完成了就来视察~");
latch.await();
System.out.println("老板来了,快跑啊~");
return "successful";
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
}
CyclicBarrier
CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点
一句话概述就是:“人满发车”
重点理解:
CountDownLatch主要用于主线程阻塞,等待子线程执行完毕后,主线程执行,例如报表导出压缩上传,子线程处理报表,主线程等都执行完毕后,压缩,上传
CyclicBarrier侧重点是人满发车,比如LOL,需要等待是个用户都加载好了之后,再开启主线程执行工作,值得注意的是,这是一般意义的CyclicBarrier
但是,CyclicBarrier提供了另一个构造方法,即可以指定默认额外的执行线程
CyclicBarrier barrier = new CyclicBarrier(5, new TotalTask(totalService));
这意味着,在很多情况CyclicBarrier可以代替CountDownLatch,主要看代码的结构设计
比如刚才的问题:报表导出压缩上传,子线程处理报表,主线程等都执行完毕后,压缩,上传
如果我用着这种构造方法,配合awit()的位置,让压缩上传线程默认作为最后执行的线程,即可保证执行的顺序,来看个Demo吧:
Demo-1 CyclicBarrier普通使用方法:
public class TestCyclicBarrier {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cycli = new CyclicBarrier(10);
for (int i = 0; i < 9; i++) {
new Thread(new BarrierThread("张" + i, cycli)).start();
}
Thread.sleep(3000);
new Thread(new BarrierThread("张" + 10, cycli)).start();
Thread.sleep(5000);
}
}
class BarrierThread implements Runnable{
private String name;
private CyclicBarrier cycli;
public BarrierThread(String name, CyclicBarrier cycli) {
super();
this.name = name;
this.cycli = cycli;
}
@Override
public void run() {
System.out.println(name + " 准备就绪");
try {
cycli.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " 开始执行");
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public CyclicBarrier getCycli() {
return cycli;
}
public void setCycli(CyclicBarrier cycli) {
this.cycli = cycli;
}
}
Demo-2 CyclicBarrier 另一种构造方法的使用:
注意 CyclicBarrier barrier = new CyclicBarrier(5, new TotalTask(totalService));
再注意子线程中,awit等待的代码位置,这个代码位置在程序的最后面,因此CyclicBarrier 的灵活性完全可以由我们来把控,到底在哪一点阻塞,完全是我们自己控制的,这样的变化,就可以在一定程度上替代CountDownLatch,达到更加灵活的目的,CyclicBarrier 且是可重复使用的,细节可以再去深入了解
/** * 各省数据独立,分库存偖。为了提高计算性能,统计时采用每个省开一个线程先计算单省结果,最后汇总。 * * @author guangbo email:weigbo@163.com * */
public class Total {
// private ConcurrentHashMap result = new ConcurrentHashMap();
public static void main(String[] args) {
TotalService totalService = new TotalServiceImpl();
CyclicBarrier barrier = new CyclicBarrier(5,
new TotalTask(totalService));
// 实际系统是查出所有省编码code的列表,然后循环,每个code生成一个线程。
new BillTask(new BillServiceImpl(), barrier, "北京").start();
new BillTask(new BillServiceImpl(), barrier, "上海").start();
new BillTask(new BillServiceImpl(), barrier, "广西").start();
new BillTask(new BillServiceImpl(), barrier, "四川").start();
new BillTask(new BillServiceImpl(), barrier, "黑龙江").start();
}
}
/** * 主任务:汇总任务 */
class TotalTask implements Runnable {
private TotalService totalService;
TotalTask(TotalService totalService) {
this.totalService = totalService;
}
public void run() {
// 读取内存中各省的数据汇总,过程略。
totalService.count();
System.out.println("=======================================");
System.out.println("开始全国汇总");
}
}
/** * 子任务:计费任务 */
class BillTask extends Thread {
// 计费服务
private BillService billService;
private CyclicBarrier barrier;
// 代码,按省代码分类,各省数据库独立。
private String code;
BillTask(BillService billService, CyclicBarrier barrier, String code) {
this.billService = billService;
this.barrier = barrier;
this.code = code;
}
public void run() {
System.out.println("开始计算--" + code + "省--数据!");
billService.bill(code);
// 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略
System.out.println(code + "省已经计算完成,并通知汇总Service!");
try {
// 通知barrier已经完成
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
相关方法:
— getParties()
获取CyclicBarrier打开屏障的线程数量,也成为方数。
— getNumberWaiting()
获取正在CyclicBarrier上等待的线程数量。
—await()
—await(timeout,TimeUnit)
—isBroken()
获取是否破损标志位broken的值,此值有以下几种情况:
CyclicBarrier初始化时,broken=false,表示屏障未破损。
如果正在等待的线程被中断,则broken=true,表示屏障破损。
如果正在等待的线程超时,则broken=true,表示屏障破损。
如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。
—reset()使得CyclicBarrier回归初始状态,直观来看它做了两件事:
如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。
CountDownLatch和CyclicBarrier的主要联系和区别如下:
1.闭锁CountDownLatch做减计数,而栅栏CyclicBarrier则是加计数。
2.CountDownLatch是一次性的,CyclicBarrier可以重用。
3.CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成。
4.鉴于上面的描述,CyclicBarrier在一些场景中可以替代CountDownLatch实现类似的功能
Semaphore
信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可
Semaphore用来控制并发线程数,但有个问题FixedThreadPool也可以控制最大并发数,那两者有何不一样呢?首先,量级来看,Semaphore轻量级,是一个并发工具类,线程池重量级无疑,其次特点来讲,Semaphore内的线程是我们实实在在自己创建的,FixedThreadPool是分配给我们的线程池里面的线程
另外,Semaphore如果默认大小为1的时候,还可以当作互斥锁使用,且有公平锁和非公平锁之分(是否按顺序执行,是则就是公平的,但是非常耗性能)
代码Demo,线程队列和Semaphore配合使用
public class TestQueeThread_2 {
static Semaphore semaphore = new Semaphore(1);
public static void main(String[] args) throws InterruptedException {
System.out.println("begin:" + (System.currentTimeMillis() / 1000));
BlockingQueue<String> myQueue = new ArrayBlockingQueue<String>(1);
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
String log = myQueue.take();
System.out.println(Thread.currentThread().getName() + ":" + doSome(log));
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
for (int i = 0; i < 100; i++) { // 这行代码不能改动
String input = i + "";
myQueue.put(input);
}
}
public static String doSome(String input) {
String output = null;
try {
Thread.sleep(1000);
output = input + ":" + (System.currentTimeMillis() / 1000);
return output;
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}
}
// 这种用法可能违背了oracle的本意,我们来看看oracle的官方Demo
// Semaphore是用来约束线程使用共享资源的,控制数据一致性,当然还得是锁
class Pool {
private static final int MAX_AVAILABLE = 100; // 可同时访问资源的最大线程数
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
protected Object[] items = new Object[MAX_AVAILABLE]; //共享资源
protected boolean[] used = new boolean[MAX_AVAILABLE];
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
private synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
Exchanger
Exchanger是用作线程并发协作的工具类,简单一句话讲,如果A,B线程都拥有Exchanger对象,如果某一个调用Exchanger的交换方法exchange时候,快的那个会主动等慢的那个(等的意思就是挂起),然后都到位之后,互相唤醒交换数据
代码Demo:
public class ExchangerTest {
static class Producer extends Thread {
private Exchanger<Integer> exchanger;
private static int data = 0;
Producer(String name, Exchanger<Integer> exchanger) {
super("Producer-" + name);
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 1; i < 5; i++) {
try {
TimeUnit.SECONDS.sleep(1);
data = i;
System.out.println(getName() + " 交换前:" + data);
data = exchanger.exchange(data);
System.out.println(getName() + " 交换后:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread {
private Exchanger<Integer> exchanger;
private static int data = 0;
Consumer(String name, Exchanger<Integer> exchanger) {
super("Consumer-" + name);
this.exchanger = exchanger;
}
@Override
public void run() {
while (true) {
data = 0;
System.out.println(getName() + " 交换前:" + data);
try {
TimeUnit.SECONDS.sleep(1);
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " 交换后:" + data);
}
}
}
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<Integer>();
new Producer("", exchanger).start();
new Consumer("", exchanger).start();
TimeUnit.SECONDS.sleep(7);
System.exit(-1);
}
}
结果打印:
Consumer- 交换前:0
Producer- 交换前:1
Consumer- 交换后:1
Producer- 交换后:0
Consumer- 交换前:0
Producer- 交换前:2
Producer- 交换后:0
Consumer- 交换后:2
Consumer- 交换前:0
Producer- 交换前:3
Producer- 交换后:0
Consumer- 交换后:3
Consumer- 交换前:0
Producer- 交换前:4
Producer- 交换后:0
Consumer- 交换后:4
Consumer- 交换前:0
我暂时没有碰到用需要此特点的需求,不过很显然它可以用作生产者消费者模式,通过网上的解释来看,Exchanger的实现是非常复杂的,主要是依赖CAS自旋操作
Phase
Phaser 是一个多栅栏的同步工具
phase(阶段) - Phaser也有栅栏,在Phaser中,栅栏的名称叫做phase(阶段),在任意时间点,Phaser只处于某一个phase(阶段),初始阶段为0,最大达到Integerr.MAX_VALUE,然后再次归零。当所有parties参与者都到达后,phase值会递增
parties(参与者) - 其实就是CyclicBarrier中的参与线程的概念,CyclicBarrier中的参与者在初始构造指定后就不能变更,而Phaser既可以在初始构造时指定参与者的数量,也可以中途通过register、bulkRegister、arriveAndDeregister等方法注册/注销参与者
arrive(到达) / advance(进阶) - Phaser注册完parties(参与者)之后,参与者的初始状态是unarrived的,当参与者到达(arrive)当前阶段(phase)后,状态就会变成arrived。当阶段的到达参与者数满足条件后(注册的数量等于到达的数量),阶段就会发生进阶(advance)——也就是phase值+1
public class SwimmerTest {
// 游泳选手个数
private static int swimmerNum = 6;
public static void main(String[] args) {
Phaser phaser = new Phaser(7){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
return phase >= 2 || registeredParties == 0;
}
};
for (int i = 0; i < swimmerNum; i++) {
new Thread(new Swimmer(phaser), "swimmer" + i).start();
}
// 主线程到达,开启第二阶段
phaser.arriveAndAwaitAdvance();
// 主线程销毁,开启第三阶段
phaser.arriveAndDeregister();
// 加while是为了防止其它线程没结束就打印了"比赛结束"
while (!phaser.isTerminated()) {}
System.out.println("===== 比赛结束 =====");
}
}
class Swimmer implements Runnable {
private Phaser phaser;
public Swimmer(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
// 从这里到第一个phaser.arriveAndAwaitAdvance()是第一阶段做的事
System.out.println("游泳选手-" + Thread.currentThread().getName() + ":已到达赛场");
phaser.arriveAndAwaitAdvance();
// 从这里到第二个phaser.arriveAndAwaitAdvance()是第二阶段做的事
System.out.println("游泳选手-" + Thread.currentThread().getName() + ":已准备好");
phaser.arriveAndAwaitAdvance();
// 从这里到第三个phaser.arriveAndAwaitAdvance()是第三阶段做的事
System.out.println("游泳选手-" + Thread.currentThread().getName() + ":完成比赛");
phaser.arriveAndAwaitAdvance();
}
}
上文说到,Phase阶段概念,且注册数量和到达数量一致的之后,就会进入下一个阶段,代码中即是如此,一开始注册七个指标,游泳子线程会运行到达6个,然后由主线程控制到达,进入到下一个阶段,注意的是参与的线程可以注册也可以销毁,所以主线程阶段二是到达,阶段三测试了销毁
Phaser 的onAdvance方法:
Phaser phaser = new Phaser(7){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
return phase >= 2 || registeredParties == 0;
}
};
当前阶段,最后一个线程到达后,会触发onAdvance方法,此处是打印了信息,且写明了Phaser终止的标志,注册线程数为0或阶段数到达2 (0,1,2)