Java中生产者与消费者模式

Wesley13
• 阅读 722

 生产者消费者模式

首先来了解什么是生产者消费者模式。该模式也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

2 用wait和notify实现

这种方法的基本原理是:一个线程负责生产数据,放到共享区域,然后通知另一个线程去消耗数据。如果没有wait()和notify(),消费者线程就要不停去检查是否有数据被产生。

接下来介绍一下wait()和notify(),在这里把它们和sleep()做一个对比,方便理解

不同点

sleep()

wait()和notify()

原理

线程用来控制自身流程,会使该线程暂停执行一段时间,把执行机会让给其它线程。时间一到就复苏。

是Object类的方法,会使当前拥有该对象锁的进程等待,直到其他线程调用notify()方法。

锁的处理机制

只是让线程暂停执行一段时间,不会释放锁

调用wait(),线程会释放掉锁

使用区域

必须放在同步控制方法或者同步语句块中

可以放在任何地方

异常

必须捕获异常,例如InterruptedException等

不用捕获异常

sleep不会释放锁,容易导致死锁(在我的上一篇博客 JAVA多线程(二)竞态条件、死锁及同步机制有描述)。因此推荐使用wait()和notify()。下面转载一份源代码,

import java.util.LinkedList;
import java.util.Queue;
import org.apache.log4j.Logger;

public class InterThreadCommunicationExample { public static void main(String args[]) { final Queue sharedQ = new LinkedList(); Thread producer = new Producer(sharedQ); Thread consumer = new Consumer(sharedQ); producer.start(); consumer.start(); } } public class Producer extends Thread { private static final Logger logger = Logger.getLogger(Producer.class); private final Queue sharedQ; public Producer(Queue sharedQ) { super("Producer"); this.sharedQ = sharedQ; } @Override public void run() { for (int i = 0; i < 4; i++) { synchronized (sharedQ) { //waiting condition - wait until Queue is not empty while (sharedQ.size() >= 1) { try { logger.debug("Queue is full, waiting"); sharedQ.wait(); } catch (InterruptedException ex) { ex.printStackTrace(); } } logger.debug("producing : " + i); sharedQ.add(i); sharedQ.notify(); } } } } public class Consumer extends Thread { private static final Logger logger = Logger.getLogger(Consumer.class); private final Queue sharedQ; public Consumer(Queue sharedQ) { super("Consumer"); this.sharedQ = sharedQ; } @Override public void run() { while(true) { synchronized (sharedQ) { //waiting condition - wait until Queue is not empty while (sharedQ.size() == 0) { try { logger.debug("Queue is empty, waiting"); sharedQ.wait(); } catch (InterruptedException ex) { ex.printStackTrace(); } } int number = sharedQ.poll(); logger.debug("consuming : " + number ); sharedQ.notify(); //termination condition if(number == 3){break; } } } } } Output: 05:41:57,244 0 [Producer] DEBUG concurrency.Producer - producing : 0 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - Queue is full, waiting 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - consuming : 0 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - Queue is empty, waiting 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - producing : 1 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - Queue is full, waiting 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - consuming : 1 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - Queue is empty, waiting 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - producing : 2 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - Queue is full, waiting 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - consuming : 2 05:41:57,260 16 [Consumer] DEBUG concurrency.Consumer - Queue is empty, waiting 05:41:57,260 16 [Producer] DEBUG concurrency.Producer - producing : 3 05:41:57,276 32 [Consumer] DEBUG concurrency.Consumer - consuming : 3 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

3 使用阻塞队列实现

使用wait()和notify()是经典方法,这里介绍一个高级方法。

BlockingQueue中提供了put()和take()方法,可以极大简化生产者消费者模式的实现过程。这一过程的基本原理是,如果队列满了,put()方法就会被阻塞;如果队列是空的,take()方法会阻塞。与传统的wait()和notify()方法相比,使用阻塞队列更简单,更便于理解。下面是一个简单的例子:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){ //Creating shared object BlockingQueue sharedQueue = new LinkedBlockingQueue(); //Creating Producer and Consumer Thread Thread prodThread = new Thread(new Producer(sharedQueue)); Thread consThread = new Thread(new Consumer(sharedQueue)); //Starting producer and Consumer thread prodThread.start(); consThread.start(); } } //Producer Class in java class Producer implements Runnable { private final BlockingQueue sharedQueue; public Producer(BlockingQueue sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { for(int i=0; i<10; i++){ try { System.out.println("Produced: " + i); sharedQueue.put(i); } catch (InterruptedException ex) { Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); } } } } //Consumer Class in Java class Consumer implements Runnable{ private final BlockingQueue sharedQueue; public Consumer (BlockingQueue sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { while(true){ try { System.out.println("Consumed: "+ sharedQueue.take()); } catch (InterruptedException ex) { Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); } } } } Output: Produced: 0 Produced: 1 Consumed: 0 Produced: 2 Consumed: 1 Produced: 3 Consumed: 2 Produced: 4 Consumed: 3 Produced: 5 Consumed: 4 Produced: 6 Consumed: 5 Produced: 7 Consumed: 6 Produced: 8 Consumed: 7 Produced: 9 Consumed: 8 Consumed: 9 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

在后面的一篇博客中,给本文提供了补充的例子。用lock、synchronized、阻塞队列三种方法实现生产者消费者模式,实现的内容是生产者产生随机数(为了方便阅读结果,我把随机数限定在10以内的整数),消费者读取并打印。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
捉虫大师 捉虫大师
3年前
一种极致性能的缓冲队列
本文已收录https://github.com/lkxiaolou/lkxiaolou欢迎star。背景在多线程下的生产者消费者模型中,需求满足如下情况:对生产者生产投递数据的性能要求非常高多个生产者,单个(多个也可以,本文只介绍单个的情况)消费者当消费者跟不上生产者速度时,可容忍少部分数据丢失生产者是单条单条地生产数据举个日志采集的例子,日志在不同的
九章 九章
3年前
一 java线程的等待/通知模型
java中线程之间的通信问题,有这么一个模型:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,也可以叫做生产者消费者问题生产者生产了产品,如何通知消费者?下面就介绍下java线程中的等待通知机制。其它语言类似,自行研究。代码附上下面是以买小
Wesley13 Wesley13
3年前
java多线程之消费者生产者模式
/@authorshijin生产者与消费者模型中,要保证以下几点:1同一时间内只能有一个生产者生产生产方法加锁sychronized2同一时间内只能有一个消费者消费消费方法加锁sychronized3生产者生产的同时消费者不能消费生产方法加锁sychronized
Wesley13 Wesley13
3年前
3.rabbitmq
rabbitmq发布订阅模式模型组成一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Qt中的QThread:使用QSemaphore进行多线程数据同步
20210127:在生产者、消费者的方法中添加线程挂起方法QThread::usleep(10),使ui不卡。20210128:在添加Track类(保存生产者Producer生成的每组数据),在ui界面中使用modelview同步显示生产者生成的数据,modelview不会对主线程造成卡顿。对消费者同样创建view,还没有进行model绑定。避免
Wesley13 Wesley13
3年前
Java多线程之线程协作
常见的线程协作方式是:生产者/消费者。一个线程作为生产者,生产要处理数据,比如拿一个线程来生产Order,用户每下一单,此线程就生产一个Order对象。设置一个仓库,来存放生产出来的Order对象。一个线程作为消费者,消费|处理仓库中的Order对象(打印订单、拣货、发货)。demo  订单处理流程1、用
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce