##
当然多线程并发有很多解决方案。从硬件上面,可以通过分布式集群。从而将同一时间的服务并发压力分离到集群的每个实体中。还有从应用层面进行控制,比如对某个接口进行限流,通过控制某一时刻该接口所能承受的并发线程数量,这种模式可以通过Queue的模式来实现(这里只列举出此时本人所想到的,肯定存在其他解决方案)。那么今天只对JAVA在引用层面如何控制多线程并发场景的。 ###二 JAVA对并发提供了哪些API### ####1.synchronized#### 提到java的并发,第一个让我想到的就是synchronized关键词。这是接触J2SE介绍线程的时候一定会介绍的。只要载方法前面加上整个关键词修饰,那么整个方法就是线程安全的,即某一时刻只会有一个线程进入该方法。如下:
<!--lang:java-->
public void synchronized threadSafeMethod(){
......
}
当然可以进行块级的控制,如下:
<!--lang:java-->
public void threadSafeBlock(){
synchronized(object){
//thread safe
}
}
####2.concurrent包#### 随着开发时间的推移知道了Doug lea这个人。知道了它的concurrent包,在该包中提供了基本满足JAVA在并发编程方面需要的API。比如原子操作类,线程安全的util类,异步执行的线程池,当然还有锁。 #####1)Lock#####
首先还是先上代码:
<!--lang:java-->
public class LockDemo {
private static final ReentrantLock lock = new ReentrantLock();
public void threadSafe(){
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
//do something
}finally {
lock.unlock();
}
}
}
上面例子是ReentrantLock一个基础的例子,可以通过开启锁的范围来定义需要并发控制的范围,从而可以调节代码载性能上面的影响,如果范围越大,那么执行的性能受到的影响就越大。所以确定好锁的范围很重要。ReentrantLock可以通过newCndition方法创建一个条件,从而可以多线程中,可以通过Condition进行通信。下面给出了一个比较经典的例子:
<!--lang:java-->
public class LockDemo {
private static final ReentrantLock lock = new ReentrantLock();
private static final Condition notFull = lock.newCondition();
private static final Condition notEmpty = lock.newCondition();
private int maxSize=10;
private int currentSize=0;//存在多线程安全问题
private Object[] array = new Object[maxSize];
public void offer(Object item) throws InterruptedException {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
if(maxSize==currentSize){
notFull.await();//释放当前线程持有的锁,让给其他线程
}
array[currentSize]=item;
currentSize++;
notEmpty.signal();
}finally {
lock.unlock();
}
}
public Object pop() throws InterruptedException {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
if(currentSize==0){
notEmpty.await();//释放当前线程持有的锁
}
currentSize--;
Object item = array[currentSize];
array[currentSize]=null;
notFull.signal();
return item;
}finally {
lock.unlock();
}
}
}
上面代码实现了一个简单队列,offer和pop方法在进入方法第一步是申请一个锁,并且载最后都执行了解锁的操作。那么可以确保这两个方法在某一时刻肯定只能有一个线程执行,但是为了不超出队列的最大大小,offer和pop之间需要通信,当队列满了的时候,需要等待pop方法通知数组有空闲的位置,当队列空的时候,需要offer方法通知pop方法队列中已经有了数据,可以进行弹出。这个过程中Condition就起到了作用。载concurrent包中也提供了一套读写锁,实现了读写锁的机制,可以通过它来对同一个资源进行安全的多线程读写,下面也列举出了简单的例子:
<!--lang:java-->
public class LockDemo {
private static final ReadWriteLock readwriteLock = new ReentrantReadWriteLock();
public void read(){
Lock lock = readwriteLock.readLock();
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
//do read
}finally {
lock.unlock();
}
}
public void write(){
Lock lock = readwriteLock.writeLock();
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
//do write
}finally {
lock.unlock();
}
}
}
上面已经实现了一个简单的队列,并且载多线程下面是安全的,可以通过上面,可以实现一个线程安全的资源池,这个是基于锁的方式。那么可不可以不使用锁来实现一个池呢?那就需要用到信号量来进行控制了。代码如下:
<!--lang:java-->
public class Pool {
private Semaphore semaphore ;
private LinkedList<Object> resources = new LinkedList<Object>();
public Pool(int size){
semaphore = new Semaphore(size);//创建当前池可以容纳的多少资源
initResource();
}
private void initResource() {
//init resource
}
public void returnResource(Object item) throws InterruptedException {
resources.offer(item);
semaphore.release();//释放一个可用的信号
}
public Object pop() throws InterruptedException {
semaphore.acquire();//请求当前时候有资源
return resources.pop();
}
}
可以看到,每当一个线程来申请资源的时候,都需要通过Semaphore来申请,如果当前资源处于紧张不够,那么就会载 semaphore.acquire();进行阻塞,等待 semaphore.release();方法示范一个可用的资源。
#####2)atomic##### 我们知道Long类型在多线程操作下是不安全的,并且整形的“++/--”都不是线程安全的,因为看似只有一次操作,其实并不是,“++/--”其实就是“-1/+1”的操作,中间的操作并非原子性,至少需要取值,操作,赋值这三步,并且取值后,可能操作数被其他线程进行了修改,然后你拿到的是一个过期的数据,进行操作,然后再赋值给该变量,必然会导致数据的不正确性。那么你可能会说,我使用volatile修饰被操作的变量,但是volatile可以确保你读到的是最新的值,但是当多个线程都拿到最新的数据,并进行操作了,然后再写回内存的时候,同样也会导致并发情况的出现。
为了解决上面的问题,Doug lea载concurrent包下面设计了一套原子性的基础类型类。此时“++/--”在它里面是原子性的。解决该问题的方式,doug lea是使用了操作系统级别的cas(compare and set)的方式来做到的原子性的,它的底层是Unsafe类。如果有兴趣可以取了解一下,这里就不作过多的解释。
###三 如何合理的使用并发控制API###
整个话题貌似抛的有点大,我这里只阐述以下我的个人理解。从两天来进行分析。空间和时间。
####1 空间#### 这个怎么说?如果看过ConcurrentHashMap源码的人都应该知道它是怎么优化的。我们知道HashTable是全局锁,所有的数据都用一把锁,这样的缺点是导致其他不存在竞争的资源也被锁作了。而Doug lea优化的方式是将数据进行拆分,分成段,每段共用一把锁,这样就避免了所有数据共用一把锁的尴尬局面,从而我对其中一段进行了锁住的时候,其他段还是可以操作的,这样提高了整个Map的执行效率。这里就是做了锁的空间优化,将一把锁控制的范围缩小,从而减少线程阻塞的时间,以提高效率。 ####2 时间#### 这里说的时间,是指当一个线程获取了锁之后,在执行被锁住区域代码的时候,减少其执行时间,从而减少其他线程阻塞的时间。上面说的actomic其实是拿时间来换取可靠并发的。因为采用的是CAS模式。大家看以下下面的代码就理解CAS是在做什么了:
<!--lang:java-->
public class SimpleAtomicInteger
{
private volatile int value=0;
public int getAndIncrement(){
while(true){
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
/**
* 这里只是模拟系统底层的cas方法,假设这个方法是线程安全的
* @param current
* @param next
* @return
*/
private boolean compareAndSet(int current, int next) {
if(value==current){//假设当前的值没有变化,这进行赋值,否则返回false进行操作
value=next;
return true;
}
return false;
}
private int get() {
return value;
}
}
通过上段代码应该知道为什么CAS会很消耗时间,当资源竞争激烈的时候,可能一个线程一直阻塞载while循环里面,每当然,这是比较极端的情况。所以如果能够尽量减少cas,那么势必将会提高整个代码的效率(这里有一篇关于Doug lea对AtomicInteger的优化,从而提高它的效率http://ifeve.com/better_atomicinteger/,方案就是减少CAS)?那么如果我们能够减少锁住代码的执行时间,时候也会提高整个代码的并发执行效率。
以上纯属个人的理解,并不具备参考意见,其中如果存在误导,还望指出。