参考:http://ifeve.com/concurrency-cyclicbarrier/
http://blog.csdn.net/tolcf/article/details/50925145
1、实现多线程现在有3种方式
继承Thread类,实现Runnable接口,实现Callable接口
callable 方式可以有返回值
package com.lli.test.service;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
* Callable + FutureTask 实现多线程 有返回值
*
* @author lli
*
* @version 1.0
*
*/
public class ThreadByCallable implements Callable<String> {
@Override
public String call() throws InterruptedException {
System.out.println("当前线程名称是:" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(5);
return "OK";
}
public static void main(String[] args) throws InterruptedException,
ExecutionException {
ThreadByCallable rt = new ThreadByCallable();
// 使用FutureTask来包装Callable对象
FutureTask<String> task = new FutureTask<String>(rt);
new Thread(task, "有返回值的线程").start();
// 获取线程返回值 task.get() 阻塞等待
System.out.println("子线程的返回值:" + task.get());
}
}
2、让主线程等待子线程执行完毕再执行
- 可以使用线程的Thread.join()方法 ,join()方法会阻塞主线程继续向下执行,不优雅
- Future.get()方法,也是阻塞等待 不优雅
- CountDownLatch(闭锁)
Join方式
join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远wait,代码片段如下,wait(0)表示永远等待下去。
while (isAlive()) {
wait(0);
}
直到join线程中止后,线程的this.notifyAll会被调用,调用notifyAll是在JVM里实现的,所以JDK里看不到,有兴趣的同学可以看看JVM源码。JDK不推荐在线程实例上使用wait,notify和notifyAll方法。
package com.lli.test.service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MyThread extends Thread {
public MyThread(String name) {
this.setName(name);
}
@Override
public void run() {
System.out.println(this.getName() + ": 滴滴滴...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.println("主线程开始");
List<MyThread> list = new ArrayList<MyThread>();
for (int i = 1; i <= 5; i++) {
MyThread my = new MyThread("线程 " + i);
my.start();
list.add(my);
}
// join方式主线程等待
try {
for (MyThread my : list) {
my.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程结束");
}
}
CountDownLatch方式
当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。
注意:CountDownLatch 传入的N和countDown方法调用次数要一致,否则会出现永远等待。
package com.lli.test.service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class MyThread2 extends Thread {
private CountDownLatch count;
public MyThread2(CountDownLatch count, String name) {
this.count = count;
this.setName(name);
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getName() + ": 滴滴滴...");
this.count.countDown();
}
public static void main(String[] args) {
System.out.println("主线程开始");
CountDownLatch count = new CountDownLatch(5);
// 这里可以改成线程池
for (int i = 1; i <= 5; i++) {
MyThread2 my = new MyThread2(count, "Thread " + i);
my.start();
}
// 主线程等待
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程结束");
}
}
3、同步屏障CyclicBarrier(栅栏)
栅栏是所有线程相互等待,直到所有线程都到达某一点时才打开栅栏,然后线程可以继续执行。
package com.lli.test.service;
import java.util.concurrent.CyclicBarrier;
/**
* 栅栏
*
* @author lli
*
* @version 1.0
*
*/
public class cyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有线程到达大闸,开闸放水");
}
});
for (int i = 0; i < 5; i++) {
new Thread(new readNum(i, cyclicBarrier)).start();
}
// cyclicBarrier.reset(); 之后
// CyclicBarrier 可以重复利用,
// 这个是CountDownLatch做不到的
// for (int i = 11; i < 16; i++) {
// new Thread(new readNum(i,cyclicBarrier)).start();
// }
}
private static class readNum implements Runnable {
private int id;
private CyclicBarrier cyc;
public readNum(int id, CyclicBarrier cyc) {
this.id = id;
this.cyc = cyc;
}
@Override
public void run() {
System.out.println(String.format("线程%s:向大闸前进", id));
try {
cyc.await();
System.out.println(String.format("线程%s:通过大闸", id));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
4、CountDownLatch和CyclicBarrier的区别
CountDownLatch : 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。
CyclicBarrier: N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
这样应该就清楚一点了,对于CountDownLatch来说,重点是那个“一个线程”, 是它在等待, 而另外那N的线程在把“某个事情”做完之后可以继续等待,可以终止。而对于CyclicBarrier来说,重点是那N个线程,他们之间任何一个没有完成,所有的线程都必须等待。
CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.
CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.
CountDownLatch
CyclicBarrier
减计数方式
加计数方式
计算为0时释放所有等待的线程
计数达到指定值时释放所有等待线程
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何
调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞
不可重复利用
调用reset方法可重复利用
1个构造方法
2个构造方法,达到栅栏,优先调用
CyclicBarrier(int parties, Runnable barrierAction)
传入线程
5、Exchanger两个线程数据交换
从javaDoc中我们可以总结如下几点:
1. Exchanger是一个同步类
2. 在而且只能在两个线程之间进行数据交换
3. 当一个线程到达exchange()调用点时,阻塞等待另外一个线程到达等待点,然后交换数据继续各自的执行
4. 可以看做是一个双向的同步队列
5. 应用于基因算法和流水线设计
package com.lli.test.service;
import java.util.concurrent.Exchanger;
public class ThreadLocalTest {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Consumer(exchanger).start();
new Producer(exchanger).start();
}
}
class Producer extends Thread {
Exchanger<String> exchanger = null;
public Producer(Exchanger<String> exchanger) {
super();
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("苹果生产者:我生产了一个苹果,发给你");
String result = exchanger.exchange("苹果");
System.out.println("苹果生产者:接收到了-" + result);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class Consumer extends Thread {
Exchanger<String> exchanger = null;
public Consumer(Exchanger<String> exchanger) {
super();
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("香蕉生产者:我生产了一个香蕉,发给你");
String result = exchanger.exchange("香蕉");
System.out.println("香蕉生产者:接收到了-" + result);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
6、控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,如果信号量许可是1的话,可以作为同步锁来用。
应用场景
Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。
package com.lli.test.service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* 30个人存钱,10个柜台
*
* @author lli
*
* @version 1.0
*
*/
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
if (s.availablePermits() > 0) {
System.out
.println("线程" + name + "启动,进入银行,有位置立即去存钱");
} else {
System.out.println("线程" + name
+ "启动,进入银行,无位置,去排队等待等待");
}
// 尝试获取许可 超时设置
s.tryAcquire(1, TimeUnit.MINUTES);
System.out.println(name + "开始存钱");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
} finally {
s.release();
System.out.println(name + "存钱完毕,离开柜台。");
}
}
});
}
threadPool.shutdown();
}
}