生产者消费者模式
首先来了解什么是生产者消费者模式。该模式也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
2 用wait和notify实现
这种方法的基本原理是:一个线程负责生产数据,放到共享区域,然后通知另一个线程去消耗数据。如果没有wait()和notify(),消费者线程就要不停去检查是否有数据被产生。
接下来介绍一下wait()和notify(),在这里把它们和sleep()做一个对比,方便理解
不同点
sleep()
wait()和notify()
原理
线程用来控制自身流程,会使该线程暂停执行一段时间,把执行机会让给其它线程。时间一到就复苏。
是Object类的方法,会使当前拥有该对象锁的进程等待,直到其他线程调用notify()方法。
锁的处理机制
只是让线程暂停执行一段时间,不会释放锁
调用wait(),线程会释放掉锁
使用区域
必须放在同步控制方法或者同步语句块中
可以放在任何地方
异常
必须捕获异常,例如InterruptedException等
不用捕获异常
sleep不会释放锁,容易导致死锁(在我的上一篇博客 JAVA多线程(二)竞态条件、死锁及同步机制有描述)。因此推荐使用wait()和notify()。下面转载一份源代码,
import java.util.LinkedList;
import java.util.Queue;
import org.apache.log4j.Logger;
public class InterThreadCommunicationExample { public static void main(String args[]) { final Queue sharedQ = new LinkedList(); Thread producer = new Producer(sharedQ); Thread consumer = new Consumer(sharedQ); producer.start(); consumer.start(); } } public class Producer extends Thread { private static final Logger logger = Logger.getLogger(Producer.class); private final Queue sharedQ; public Producer(Queue sharedQ) { super("Producer"); this.sharedQ = sharedQ; } @Override public void run() { for (int i = 0; i < 4; i++) { synchronized (sharedQ) { //waiting condition - wait until Queue is not empty while (sharedQ.size() >= 1) { try { logger.debug("Queue is full, waiting"); sharedQ.wait(); } catch (InterruptedException ex) { ex.printStackTrace(); } } logger.debug("producing : " + i); sharedQ.add(i); sharedQ.notify(); } } } } public class Consumer extends Thread { private static final Logger logger = Logger.getLogger(Consumer.class); private final Queue sharedQ; public Consumer(Queue sharedQ) { super("Consumer"); this.sharedQ = sharedQ; } @Override public void run() { while(true) { synchronized (sharedQ) { //waiting condition - wait until Queue is not empty while (sharedQ.size() == 0) { try { logger.debug("Queue is empty, waiting"); sharedQ.wait(); } catch (InterruptedException ex) { ex.printStackTrace(); } } int number = sharedQ.poll(); logger.debug("consuming : " + number ); sharedQ.notify(); //termination condition if(number == 3){break; } } } } } Output: 05:41:57,244 0 [Producer] DEBUG concurrency.Producer - producing : 0 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - Queue is full, waiting 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - consuming : 0 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - Queue is empty, waiting 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - producing : 1 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - Queue is full, waiting 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - consuming : 1 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - Queue is empty, waiting 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - producing : 2 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - Queue is full, waiting 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - consuming : 2 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - Queue is empty, waiting 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - producing : 3 05:41:57,276 32 [Consumer] DEBUG concurrency.Consumer - consuming : 3
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
3 使用阻塞队列实现
使用wait()和notify()是经典方法,这里介绍一个高级方法。
BlockingQueue中提供了put()和take()方法,可以极大简化生产者消费者模式的实现过程。这一过程的基本原理是,如果队列满了,put()方法就会被阻塞;如果队列是空的,take()方法会阻塞。与传统的wait()和notify()方法相比,使用阻塞队列更简单,更便于理解。下面是一个简单的例子:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ProducerConsumerPattern {
public static void main(String args[]){ //Creating shared object BlockingQueue sharedQueue = new LinkedBlockingQueue(); //Creating Producer and Consumer Thread Thread prodThread = new Thread(new Producer(sharedQueue)); Thread consThread = new Thread(new Consumer(sharedQueue)); //Starting producer and Consumer thread prodThread.start(); consThread.start(); } } //Producer Class in java class Producer implements Runnable { private final BlockingQueue sharedQueue; public Producer(BlockingQueue sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { for(int i=0; i<10; i++){ try { System.out.println("Produced: " + i); sharedQueue.put(i); } catch (InterruptedException ex) { Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); } } } } //Consumer Class in Java class Consumer implements Runnable{ private final BlockingQueue sharedQueue; public Consumer (BlockingQueue sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { while(true){ try { System.out.println("Consumed: "+ sharedQueue.take()); } catch (InterruptedException ex) { Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); } } } } Output: Produced: 0 Produced: 1 Consumed: 0 Produced: 2 Consumed: 1 Produced: 3 Consumed: 2 Produced: 4 Consumed: 3 Produced: 5 Consumed: 4 Produced: 6 Consumed: 5 Produced: 7 Consumed: 6 Produced: 8 Consumed: 7 Produced: 9 Consumed: 8 Consumed: 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
在后面的一篇博客中,给本文提供了补充的例子。用lock、synchronized、阻塞队列三种方法实现生产者消费者模式,实现的内容是生产者产生随机数(为了方便阅读结果,我把随机数限定在10以内的整数),消费者读取并打印。