Java并发 阻塞队列

Wesley13
• 阅读 591

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加操作支持阻塞地插入和移除方法。支持阻塞插入的方法是指当队列满时会阻塞插入元素的线程,直到队列不满;支持阻塞移除的方法是指当队列为空时获取元素的线程无法继续获取元素直到队列不空。

可以发现阻塞队列非常适合消费者和生产者场景下进行使用,生产者生产数据就是向阻塞队列中插入元素,消费者消费数据就是从阻塞队列中移除元素。

Java提供了阻塞队列支持如下方法:

插入方法:add(e)(添加失败会抛出异常)、offer(e)(添加失败返回特殊值)、put(e)(添加失败会一直阻塞) 
移除方法:remove(e)(移除失败会抛出异常)、poll(e)(移除失败会返回特殊值)、take(e)(移除失败会一直阻塞)

在Java中提供了无界队列,这种情况下队列不可能出现满的情况(除非发生内存溢出),所以使用put和take方法永远不会被阻塞,offer返回的永远是true。

在Java中提供了7种阻塞队列,使用较多有四种:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue和DelayQueue。ArrayBlockingQueue是一个由数组结构组成的有界阻塞队列,LinkedBlockingQueue是一个由链表结构组成的有界阻塞队列,PriorityBlockingQueue是一个支持优先级排序的无界阻塞队列,DelayQueue是一个使用优先级队列实现的支持延时获取元素的无界阻塞队列。DelayQueue适用于缓存系统的设计以及定时任务调度等场景。

那么阻塞队列是如何实现线程的同步的呢?使用通知模式实现

通知模式是指当生产者往满的队列添加元素的时候会阻塞生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列已经不满了,这时生产者可以继续往队列中添加元素。就ArrayBlockingQueue而言,是使用Condition条件变量实现通知模式的。

public ArrayBlockingQueue(int capacity, boolean fair) {
        //省略部分代码
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
//添加元素的方法
public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            //如果队列不满就入队
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
 //入队的方法
 private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
 //移除元素的方法
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
 //出队的方法
 private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
  • 从源码可以看出,阻塞队列的实现仍然是使用了经典的等待/通知模式实现的。使用阻塞队列的好处在于使用者不用关心什么时候等待,什么时候进行通知,什么时候添加元素什么时候取元素都由使用者实现,让使用者可以更多关注业务的实现。那么对于上一篇文章提到的生产者消费者模式,如何使用阻塞队列实现呢?

下面代码演示了使用阻塞队列实现生产者消费者模式:

package com.rhwayfun.concurrency;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by rhwayfun on 16-4-4.
 */
public class ProducerConsumerModeWithBlockQueueTest {

    static class Info{
        //内容
        private String content;

        public Info(String content) {
            this.content = content;
        }

        public String getContent() {
            return content;
        }

        public void setContent(String content) {
            this.content = content;
        }

        @Override
        public String toString() {
            return this.getContent();
        }
    }

    static class Producer implements Runnable{

        private final BlockingQueue<Info> blockingQueue;

        public Producer(BlockingQueue<Info> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        public void run() {
            boolean flag = true;
            for (int i = 0; i < 5; i++){
                if (flag){
                    try {
                        blockingQueue.put(new Info("contentA"));
                        System.out.println("[生产者]:contentA");
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    flag = false;
                }else {
                    try {
                        blockingQueue.put(new Info("contentB"));
                        System.out.println("[生产者]:contentB");
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    flag = true;
                }
            }
        }
    }

    static class Consumer implements Runnable{

        private final BlockingQueue<Info> blockingQueue;

        public Consumer(BlockingQueue<Info> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        public void run() {
            while (true){
                try {
                    System.out.println("[消费者]:" + blockingQueue.take());
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args){
        BlockingQueue<Info> blockingQueue = new LinkedBlockingQueue<Info>();
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }
}
  • 可以发现,相比之前使用等待/通知模式实现的生产者消费者模式,使用阻塞队列实现的代码更加简洁,Info类无需添加任何同步方法,程序的可扩展性提高了提高,耦合度也降低了。
点赞
收藏
评论区
推荐文章
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
java并发数据结构
一.BlockingDeque阻塞双端队列(线程安全):注意ArrayDeque和LinkedList仅仅扩展了Deque,是非阻塞类型的双端队列。BlockingQueue单向队列,其内部基于ReentrantLockCondition来控制同步和"阻塞"/"唤醒"的时
Wesley13 Wesley13
3年前
Java多线程之线程安全队列Queue
在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。注:什么叫线程安全?这个首先要明确。
Stella981 Stella981
3年前
LinkedBlockingQueue 介绍
LinkedBlockingQueue是一个基于已链接节点的、范围任意的blockingqueue。此队列按FIFO(先进先出)排序元素。队列的头部是在队列中时间最长的元素。队列的尾部是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可
Stella981 Stella981
3年前
BlockingQueue介绍
几种类型的BlockingQueueArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。DelayQueue:一个使用优先级队列实现的无界阻塞队列。Synchro
Wesley13 Wesley13
3年前
Java并发系列9
今天要讲的BlockingQueue可谓是大名鼎鼎,在并发编程中比较常见的一个类。BlockingQueue顾名思义是表示一个阻塞队列,注意这两个词:阻塞和队列。可以拿我们熟悉的生产者消费者队列来举例,一条流水线上,A生产零件,B组装零件,A就是生产者,B是消费者。如果A生成的太快,则零件堆积,A需要休息一会儿等待B把零件消费完;如果A生产的太
Wesley13 Wesley13
3年前
JAVA线程15
一、阻塞队列1\.概述阻塞队列是Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue。阻塞队列是一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止。同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元
Stella981 Stella981
3年前
Netty学习笔记1:5种IO模型
1阻塞IO模型从字面来理解,就是调用时可能被阻塞,什么叫阻塞,要知道一个进程有N种状态,学过OS都知道如果阻塞,就会把当前进程放在某个条件的阻塞队列里。直到条件满足了,才会转移此进程进入就绪队列。当然,就绪队列还有个优先级的概念,就不扯远了。阻塞IO.1)调用API,比如 r
Stella981 Stella981
3年前
BlockingQueue(阻塞队列)详解
注意:该随笔内容完全引自http://wsmajunfeng.iteye.com/blog/1629354,写的很好,非常感谢,复制过来算是个积累,怕以后找不到。一.前言  在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程