Java.Utril.Concurrent
Volatile关键字
避免java虚拟机指令重排序,保证共享数据修改同步,数据可见性。volatile相较于synchronized是一种比较轻量级地同步策略,但不具备互斥性,不能成为synchronized的替代,不能保证原子性。
###示例
package com.wang.test.juc.test_volatile;
public class TestVolatile {
public static void main(String[] args) {
TestThread testThread = new TestThread();
new Thread(testThread).start();
while (true)//在线程运行后,读取线程中的flag值
{
if(testThread.isFlag()){
System.out.println(" get Flag success! ");
break;
}
}
}
}
class TestThread implements Runnable{
private boolean flag = false;
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
public void run(){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("flag:" + flag);
}
}
###分析
主线程中的读取操作看似在线程执行后,但并发地执行,在线程更改数据之前,主线程已经读取了数据,共享地数据flag不同步。
###修改
public class TestVolatile {
public static void main(String[] args) {
TestThread testThread = new TestThread();
new Thread(testThread).start();
while (true)//在线程运行后,读取线程中的flag值
{
if(testThread.isFlag()){
System.out.println(" get Flag success! ");
break;
}
}
}
}
class TestThread implements Runnable{
private volatile boolean flag = false;
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
public void run(){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("flag:" + flag);
}
}
在Flag属性上添加volatile主方法即可读取正确值。
原子性
###示例
package com.wang.test.juc.test_volatile;
public class TestAtomic {
public static void main(String[] args) {
Atomic atomic = new Atomic();
Thread t1 = new Thread(atomic);
Thread t2 = new Thread(atomic);
t1.start();
t2.start();
}
}
class Atomic implements Runnable{
private int i =0;
public void run() {
while(true){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+": "+getCountI());
}
}
public int getCountI()
{
return i++;
}
}
###分析
由于线程中操作i++不具备原子性,线程执行中,数据i的递增会出现重复问题,即i的值不会正常递增,会出现线程做出同样操作的情况。此时因为这个操作不是原子的,使用volitale修饰不能解决同步问题。
原子类
CAS算法
Compare-And-Swap算法时硬件对于并发操作共享数据的支持,包含三个操作数,内存值V、预估值A、更新值B,只有 V == A 时,V = B执行,否则不进行任何操作。
修改程序
import java.util.concurrent.atomic.AtomicInteger;
public class TestAtomic {
public static void main(String[] args) {
Atomic atomic = new Atomic();
Thread t1 = new Thread(atomic);
Thread t2 = new Thread(atomic);
t1.start();
t2.start();
}
}
class Atomic implements Runnable{
private AtomicInteger i = new AtomicInteger(0);
public void run() {
while(true){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+": "+getCountI());
}
}
public int getCountI()
{
return i.addAndGet(1);
}
}
此时线程中i已经是一个原子类型,那么在对数据进行操作的时候是具备原子性的,所有线程在执行i自增时具有源自性,解决了并发问题。
ConcurrentHashMap
HashTable是线程安全的,在访问HashTable时会加上表锁,将操作转为串行,不允许有空值,效率比较低。
CuncurrentHashMap是线程安全的HashMap,采用锁分段机制,每个数据段都是独立的锁,在访问时,可以并行执行,提高效率
###其他
ConcurrentSkipListMap:同步的TreeMap
CopyOnWriteArrayList:同步的ArrayList (读取和遍历大于更新)
Collections.synchronizedList(new ArrayList(String))
闭锁
CountDownLatch为同步辅助类,在完成一组正在其他线程中执行的操作之前,允许一个或多个线程一直等待。
###示例
import java.util.concurrent.CountDownLatch;
public class TestCountDownLatch {
public static void main(String[] args) {
final CountDownLatch countDownLatch = new CountDownLatch(2);
//锁值为2
LatchLock latchLock = new LatchLock(countDownLatch);
long start = System.currentTimeMillis();
Thread t1 = new Thread(latchLock);
Thread t2 = new Thread(latchLock);
t1.start();
t2.start();
try {
countDownLatch.await();//锁不为0 主线程等待
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("Time = [" + (end - start) + "mms"+"]");
}
}
class LatchLock implements Runnable{
private CountDownLatch countDownLatch;
public LatchLock(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
public void run() {
synchronized (this){
try {
for(int i=0;i<1000;i++)
{
System.out.println(Thread.currentThread().getName()+": "+i);
}
}finally {
countDownLatch.countDown();//线程执行一次锁减一
}
}
}
}
Callable接口
此接口相较于Runnable接口,可以返回值和抛异常。
###示例
public class TestCallable {
public static void main(String[] args) {
ThreadCallable testCallable = new ThreadCallable();
//执行Callable,需要使用FutureTask 实现类用于接收结果
FutureTask<Integer> futureTask = new FutureTask(testCallable);
Thread thread = new Thread(futureTask);
thread.start();
Integer result = 0;
try {
result = futureTask.get();
//此方法将在线程执行结束后才会执行
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("result = [" + result + "]");
}
}
class ThreadCallable implements Callable<Integer>{
public Integer call() throws Exception {
int sum = 0;
for (int i=0;i<100;i++) {
sum+=i;
System.out.println("i: "+i);
}
return sum;
}
}
Lock锁
在解决同步问题时,采用synchronized关键字给代码块加锁或者给方法加锁,关键字加锁方式时隐式的,所的获取和释放由执行过程中的线程自行完成,需要显式地完成加锁和锁释放时,可以使用lock加锁方式。
示例
package com.wang.test.juc.cchm;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock {
public static void main(String[] args) {
TestThread testThread = new TestThread();
Thread t1 = new Thread(testThread);
Thread t2 = new Thread(testThread);
Thread t3 = new Thread(testThread);
t1.start();
t2.start();
t3.start();
}
}
class TestThread implements Runnable{
private Lock lock = new ReentrantLock();//锁
private int count = 1000;
public void run() {
while (true){ // 自旋等待!
lock.lock();
try {
Thread.sleep(1);
if (count > 0)
System.out.println(Thread.currentThread().getName()+" count :"+ --count);
} catch (InterruptedException e) {
e.printStackTrace();
}finally { // finally释放锁!
lock.unlock();
}
}
}
}
等待唤醒
###示例-生产者消费者
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor,"Producter").start();
new Thread(consumer,"Customer").start();
}
}
class Clerk{
private int pruduct = 0;
public synchronized void income(){
if (pruduct >= 10){
System.out.println("Can not add more");
}else{
System.out.println(Thread.currentThread().getName()+": "+ ++pruduct);
}
}
public synchronized void sale(){
if (pruduct <= 0) {System.out.println("Can not sale anything!");}
else {
System.out.println(Thread.currentThread().getName()+" :"+ --pruduct);
}
}
}
class Productor implements Runnable{
private Clerk clerk;
public Productor(Clerk clerk){
this.clerk = clerk;
}
public void run(){
for (int i=0;i<20;i++){
clerk.income();
}
}
}
class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i=0;i<20;i++){
clerk.sale();
}
}
}
###分析
生产者消费者都会一直进行,会出现没有产品继续消费和库存已满继续生产,即没有货物依旧被多次消费,无法库存仍旧多次生产。
###改进
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor,"Producter").start();
new Thread(consumer,"Customer").start();
}
}
class Clerk{
private int pruduct = 0;
public synchronized void income(){
if (pruduct >= 10){
System.out.println("Can not add more");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
System.out.println(Thread.currentThread().getName()+": "+ ++pruduct);
this.notifyAll();
}
}
public synchronized void sale(){
if (pruduct <= 0) {System.out.println("Can not sale anything!");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else {
System.out.println(Thread.currentThread().getName()+" :"+ --pruduct);
this.notifyAll();
}
}
}
class Productor implements Runnable{
private Clerk clerk;
public Productor(Clerk clerk){
this.clerk = clerk;
}
public void run(){
for (int i=0;i<20;i++){
clerk.income();
}
}
}
class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i=0;i<20;i++){
clerk.sale();
}
}
}
等待唤醒,当发生满货或是销空时,进行等待。以上代码无法结束,最后一次地等待,无法被唤醒,由else引发,继续增加生产者和消费者,将会出现虚假唤醒,必须让它自旋等待。
改进 2.0
package com.wang.test.juc.cchm;
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor,"Producter").start();
new Thread(consumer,"Customer").start();
new Thread(productor,"Producter2").start();
new Thread(consumer,"Customer2").start();
}
}
class Clerk{
private int pruduct = 0;
public synchronized void income(){
while (pruduct >= 10){//自旋
System.out.println("Can not add more");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+": "+ ++pruduct);
this.notifyAll();
}
public synchronized void sale(){
while (pruduct <= 0) {System.out.println("Can not sale anything!");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+" :"+ --pruduct);
this.notifyAll();
}
}
class Productor implements Runnable{
private Clerk clerk;
public Productor(Clerk clerk){
this.clerk = clerk;
}
public void run(){
for (int i=0;i<20;i++){
clerk.income();
}
}
}
class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i=0;i<20;i++){
clerk.sale();
}
}
}
同步锁
生产消费模型
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor,"Producter").start();
new Thread(consumer,"Customer").start();
new Thread(productor,"Producter2").start();
new Thread(consumer,"Customer2").start();
}
}
class Clerk{
private int pruduct = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
//!!!!
public void income(){
lock.lock();
try {
while (pruduct >= 10){
System.out.println("Can not add more");
try {
condition.await();
//!!!!!
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+": "+ ++pruduct);
condition.signalAll();
//!!!!
}finally {
lock.unlock();
}
}
public void sale(){
lock.lock();
try {
while (pruduct <= 0) {System.out.println("Can not sale anything!");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+" :"+ --pruduct);
condition.signalAll();
}finally {
lock.unlock();
}
}
}
class Productor implements Runnable{
private Clerk clerk;
public Productor(Clerk clerk){
this.clerk = clerk;
}
public void run(){
for (int i=0;i<20;i++){
clerk.income();
}
}
}
class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i=0;i<20;i++){
clerk.sale();
}
}
}
示例交替打印
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestPrintInOrder {
public static void main(String[] args) {
final Alternate alternate = new Alternate();
new Thread(new Runnable() {
public void run() {
for (int i=0;i<20;i++){
alternate.PrintA();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
for (int i=0;i<20;i++){
alternate.PrintB();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
for (int i=0;i<20;i++){
alternate.PrintC();
}
}
}).start();
}
}
class Alternate{
private int mark = 1;
private Lock lock = new ReentrantLock();
private Condition c1 =lock.newCondition();
private Condition c2 =lock.newCondition();
private Condition c3 =lock.newCondition();
public void PrintA(){
lock.lock();
try{
while (mark != 1){
c1.await();
}
System.out.println(Thread.currentThread().getName()+": "+"A");
mark = 2;
c2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void PrintB(){
lock.lock();
try{
while (mark != 2){
c2.await();
}
System.out.println(Thread.currentThread().getName()+": "+"B");
mark = 3;
c3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void PrintC(){
lock.lock();
try{
while (mark != 3){
c3.await();
}
System.out.println(Thread.currentThread().getName()+": "+"C");
mark = 1;
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
读写锁
当数据在进行写入时,读取操作需要保持同步,即读写应当时互斥的,读取锁可以共享,写入锁独占。
###示例
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestReadWriteLock {
public static void main(String[] args) {
final TestLockRW testLockRW = new TestLockRW();
new Thread(new Runnable() {
public void run() {
testLockRW.write((int)(Math.random()*1000));
}
}).start();
for (int i=0;i<20;i++){
new Thread(new Runnable() {
public void run() {
testLockRW.read();
}
}).start();
}
}
}
class TestLockRW{
private int i = 0;
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void read(){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+": "+i);
}finally {
readWriteLock.readLock().unlock();
}
}
public void write(int random){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+": write "+random);
i = random;
}finally {
readWriteLock.writeLock().unlock();
}
}
}
##线程八锁
public class TestThread8Monitor {
public static void main(String[] args) {
Number number = new Number();
Number number2 = new Number();
new Thread(new Runnable() {
public void run() {
number.getOne();
}
}).start();
new Thread(new Runnable() {
public void run() {
number2.getTwo();
}
}).start();
// new Thread(new Runnable() {
// public void run() {
// number.getThree();
// }
// }).start();
}
}
class Number{
public static synchronized void getOne(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("One");
}
public static synchronized void getTwo(){
System.out.println("Two");
}
public void getThree(){
System.out.println("Three");
}
}
- 非静态方法的锁默认为this
- 静态方法的锁为Class实例
- 在某时刻内,只有一个线程拿到锁。
线程池
类比数据库连接池,创建线程和销毁线程比较浪费资源,建立一个线程池,线程池提供一个线程队列,队列中保存着所有等待状态的线程,在需要使用线程时直接在线程池中获取,使用完毕后,归还给线程池,提高相应速度。
体系结构
java.util.concurrent.Executor
|--ExecutorService 线程池主要接口
|--ThreadPoolExecutor 线程池实现类
|--ScheduleExecutorService 线程调度接口
|--ScheduledThreadPoolExecutor 继承线程池实现调度接口
使用方法:
工具类:Executors
Executors.ewCachedThreadPool() 数量不固定,动态更改数量
Executors.newFixedThreadPool(int) 固定容量
Executors.newSingleThreadExecutor() 单个线程线程池
返回值类型为ExecurotService
ScheduledThreadPoolExecutor 线程调度
静态方法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
ThreadPoolimp threadPoolimp = new ThreadPoolimp();
for ( int i = 0;i<1000;i++){
executorService.submit(threadPoolimp);
//支持多种线程初始化参数 Runnable、Callable...
}
executorService.shutdown();
//new Thread(new ThreadPoolimp()).start();
}
}
class ThreadPoolimp implements Runnable{
private int i = 0;
public void run() {
while(true){
System.out.println(Thread.currentThread().getName()+" :"+ ++i);
}
}
}
public class TestScheduledThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
Future<Integer> future = pool.schedule(new Callable<Integer>() {
public Integer call() throws Exception {
int i =1;
System.out.println(Thread.currentThread().getName());
return i;
}
},5, TimeUnit.SECONDS);//延迟时间 时间单位
System.out.println(future.get());
pool.shutdown();
}
}
分支合并框架
在必要的情况下,将一个大人物,进行拆分,拆分成若干的小人物,再将一个个小任务的运算结果进行汇总。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestForkJoinPool {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkSun forkSun = new ForkSun(0L, 1000000000L);
Long sum = pool.invoke(forkSun);
System.out.println("sum = [" + sum + "]");
}
}
class ForkSun extends RecursiveTask<Long>{
private static final long serialVersionUID = 7430797084800536110L;
private long start;
private long end;
private static final long max = 10000l;
public ForkSun(long start, long end) {
this.start = start;
this.end = end;
}
protected Long compute() {
long len = end - start;
if(len <= max){
long sum = 0L;
for(long i = start;i<=end;i++)
{
sum+=i;
}
return sum;
}else {
long middle = (start + end)/2;
ForkSun left = new ForkSun(start,middle);
left.fork();
ForkSun right = new ForkSun(middle+1,end);
right.fork();
return left.join() + right.join();
}
}
}