Java并发新构件之PriorityBlockingQueue

Wesley13
• 阅读 869

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 

    PriorityBlockingQueue是一个很基础的优先级队列,它在PriorityQueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向Queue里面增加元素可能会失败(导致OurOfMemoryError)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
    private static int counter = 1;
    private final int priority;
    private Random random = new Random(47);
    private final int id = counter++;
    protected static List<PrioritizedTask> sequence = new ArrayList<>();
    public PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }
    @Override
    public int compareTo(PrioritizedTask o) {
        int val = this.priority - o.priority;
        //higher value, higher priority
        return val < 0 ? 1 : (val > 0 ? -1 : 0);
    }
    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
        } catch (InterruptedException e) {
        }
        System.out.println(this);
    }
    @Override
    public String toString() {
        return String.format("P=[%1$-3d]", priority) + ", ID=" + id;
    }
    public static class EndFlagTask extends PrioritizedTask {
        private ExecutorService exec;
        public EndFlagTask(ExecutorService executorService) {
            super(-1);//最低的优先级,即最后出列的
            exec = executorService;
        }
        @Override
        public void run() {
            System.out.println(this + " calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Queue<Runnable> queue;
    private ExecutorService exec;
    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
        this.queue = queue;
        this.exec = exec;
    }
    @Override
    public void run() {
        try {
            //慢慢的添加高优先级的任务
            for (int i = 0; i < 6; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(9)); //6个优先级9
            }
            //先创建2个P=0的任务
            queue.add(new PrioritizedTask(0));
            queue.add(new PrioritizedTask(0));
            //添加低优先级的任务
            for (int i = 0; i < 6; i++) {// 优先级0-5
                queue.add(new PrioritizedTask(i));
            }
            //添加一个结束标志的任务
            queue.add(new PrioritizedTask.EndFlagTask(exec));
            
        } catch (InterruptedException e) {
            // TODO: handle exception
        }
        System.out.println("Finished PrioritizedTaskProducer.");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> queue;
    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            //不停的从queue里面取任务,直到exec停止。
            while(!Thread.interrupted()) {
                //使用当前线程来跑这些任务
                queue.take().run();
            }
        } catch (InterruptedException e) {
            
        }
        System.out.println("Finished PrioritizedTaskConsumer.");
    }
}

public final class PriorityBlockingQueueDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

执行结果:

P=[9  ], ID=1
P=[9  ], ID=2
P=[9  ], ID=3
P=[9  ], ID=4
P=[9  ], ID=5
Finished PrioritizedTaskProducer.
P=[9  ], ID=6
P=[5  ], ID=14
P=[4  ], ID=13
P=[3  ], ID=12
P=[2  ], ID=11
P=[1  ], ID=10
P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8
P=[-1 ], ID=15 calling shutdownNow()
Finished PrioritizedTaskConsumer.

    PrioritizedTask对象的创建序列被记录在sequenceList中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndFlagTask提供了停止ExecutorService的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。

    PrioritizedTaskProducer和PrioritizedTaskConsumer通过PriorityBlockingQueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

    从执行结果中可以看到,最先出队列的是Priority为9的6个Task,因为这几个任务先创建。

Finished PrioritizedTaskProducer.

    这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到Queue中和从Queue中提取任务并执行时两个不同的任务(即Producer和Consumer),因此Producer先输出“Finished PrioritizedTaskProducer.”。输出这句话的时候,前面只有5个P=9的任务出列了,因此队列中还有1个P=9的任务没出列,同时还有后续放入各种任务。由于Queue中的任务里面,优先级P最高的是P=9的,因此第6个P=9的任务先出队列。剩下的任务按照P的大小依次出列。

    任务的ID属性表示了它们的创建顺序,因为ID是自增的,每创建一个任务,ID就增加。因此从

P=[5  ], ID=14

    可以很明显的看出:P=5的任务,它的ID最大,所以是最后创建的。从我们的代码中也可以看出来,P=5的任务的确是最后创建的。

    还有一点可以看出,当P相同的时候,出Queue的顺序是不确定的,例如:

P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8

    另外,在使用此类的时候需要注意:

This class does not permit null elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException).

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java爬虫之JSoup使用教程
title:Java爬虫之JSoup使用教程date:201812248:00:000800update:201812248:00:000800author:mecover:https://imgblog.csdnimg.cn/20181224144920712(https://www.oschin
Wesley13 Wesley13
3年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
18小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(