二 Java利用等待/通知机制实现一个线程池

九章
• 阅读 1551

接着上一篇博客的 一Java线程的等待/通知模型 ,没有看过的建议先看一下。下面我们用等待通知机制来实现一个线程池.

本文的代码放到了github上,地址如下: git@github.com:jiulu313/ThreadPool.git

线程的任务就以打印一行文本来模拟耗时的任务。主要代码如下:

1 定义一个任务的接口。

  /* 任务的接口 3  */
 public interface Task { 
     void doSomething(); 
}

2 实现一个具体的任务。

/*
 * 具体的任务
 */
public class PrintTask implements Task{

    //打印一句话,睡一秒,来模拟耗时的任务
    @Override
    public void doSomething() {
        System.out.println("任务:"+Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3 实现工作线程

    /*
 * 工作者线程
 */
public class Worker implements Runnable {
    //线程是否正在运行
    private boolean running = true;

    //保存Thread,方便start()
    private Thread thread;

    //保存线程池的任务队列,作同步用
    private LinkedList<Task> tasks;

    public void setThread(Thread thread) {
        this.thread = thread;
    }

    public void setTasks(LinkedList<Task> tasks) {
        this.tasks = tasks;
    }

    //启动此工作线程
    public void start() {
        if (thread != null) {
            thread.start();
        }
    }

    // 关闭此工作线程
    public void shutDown() {
        running = false;
        thread.interrupt();
    }

    @Override
    public void run() {
        while (running) {
            Task task = null;

            //对共享变量加锁,此处为任务队列,因为会有多个线程访问
            synchronized (tasks) {

                //当条件不满足时,线程等待,见上一篇博文
                while (tasks.isEmpty()) {
                    try {
                        //线程进入等待状态,并且释放锁
                        tasks.wait();
                    } catch (InterruptedException e) {
                        //感知到外部对此线程的中断操作
                        Thread.currentThread().interrupt();
                        return;
                    }
                }

                //条件满足
                task = tasks.removeFirst();
            }

            //执行任务
            if (task != null) {
                task.doSomething();
            }
        }
    }
}

4 创建一个线程池

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class DefaultThreadPool implements ThreadPool {
    private int maxWorksNum = 10;
    private int minWorksNum = 1;
    private int defaultWorksNum = 5;

    // 任务列表
    private LinkedList<Task> tasks = new LinkedList<>();

    // 工作线程列表
    private LinkedList<Worker> workers = new LinkedList<>();

    //工作线程个数
    private int workerNum = defaultWorksNum;


    @Override
    public void excute(Task task) {
        // 添加一个工作,然后进行通知
        if (task != null) {
            synchronized (tasks) {
                //添加到最后一个位置
                tasks.addLast(task);
                //通知等待的线程,有新的任务了
                tasks.notify();
            }
        }
    }

    // 关闭线程池
    @Override
    public void shutDown() {
        for (Worker worker : workers) {
            worker.shutDown();
        }
    }

    // 初始化工作者线程
    public void initWorkers(int num) {
        if (num > maxWorksNum) {
            num = maxWorksNum;
        } else if (num < minWorksNum) {
            num = minWorksNum;
        } else {
            num = defaultWorksNum;
        }

        for (int i = 0; i < workerNum; i++) {
            //创建工作线程
            Worker worker = new Worker();

            //添加到工作队列
            workers.add(worker);

            //新建一个线程对象,并将worker赋值
            Thread thread = new Thread(worker);

            //设置线程对象,作启动,中断用
            worker.setThread(thread);

            //设置任务队列,作同步用
            worker.setTasks(tasks);
        }
    }

    // 启动线程池
    public void start(){
        if(workers != null){
            for(Worker worker : workers){
                //启动一个工作线程
                worker.start();
            }
        }
    }

    // 新增加工作线程,但是不能大于线程池最大线程数
    @Override
    public void addWorkers(int num) {
        if (num <= 0) {
            return;
        }

        int remain = maxWorksNum - workerNum;
        if (num > remain) {
            num = remain;
        }

        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker);
            thread.start();
        }

        workerNum = workers.size();
    }

    // 减少工作线程,至少留1个,不能减少到0
    @Override
    public void removeWorkers(int num) {
        if(num >= workerNum || num <= 0){
            return;
        }

        for(int i =0;i<num;i++){
            Worker worker = workers.getLast();
            worker.shutDown();
        }

        workerNum = workers.size();
    }

    @Override
    public int getTaskSize() {
        return tasks.size();
    }


}

5 新建测试类

public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        //1 新建一个线程池
        DefaultThreadPool pool = new DefaultThreadPool();

        //2 设置线程池的大小为5
        pool.initWorkers(5);

        //3 启动线程池
        pool.start();

        //4 往任务队列里面添加任务
        for(int i = 0;i<100;i++){
            pool.excute(new PrintTask());
        }

    }
}

在eclipse中运行,结果部分截图如下:

二 Java利用等待/通知机制实现一个线程池

好了,一个线程池就这样,这只是一个小例子,提示线程池的原理

其实实现工作中,一个线程池要考虑的问题远比这个多,也更复杂。

其中比较重要的两个方面:

1 线程池开几个线程为最合适?

我们知道,线程不是开的越多越好,而要根据业务的场景,硬件的指标,带宽的大小等等

一般线程池的个数为CPU核心数的个数加1 ,google的建议。此外可能还要具体分析业务

大,中,小的业务需求,也是不一样的。

大任务:比如下载一部电影,可能要十几分钟甚至几十分钟的任务

中任务:比如下载一幅图片,有1M以上了到十几M的大小的。

小任务:比如下载的是游戏的ico,就十几K的到1M以下的。

小任务可以多开几个线程。

中任务的可以保守点。

大任务的尽量不要开的线程太多

具体值还需要看具体业务,具体场景。这些只是建议。

2 线程用哪种队列,也是和上面有关系。

今天就到这了,后续还会抽时间研究线程并发这块,希望对大家有帮忙。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
java 面试知识点笔记(十三)多线程与并发
java线程池,利用Exceutors创建不同的线程池满足不同场景需求:1.newSingleThreadExecutor() 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。2.
九章 九章
3年前
一 java线程的等待/通知模型
java中线程之间的通信问题,有这么一个模型:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,也可以叫做生产者消费者问题生产者生产了产品,如何通知消费者?下面就介绍下java线程中的等待通知机制。其它语言类似,自行研究。代码附上下面是以买小
Wesley13 Wesley13
3年前
Java通过Executors提供四种线程池
Java通过Executors提供四种线程池,分别为:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newScheduledThreadPool创建
Wesley13 Wesley13
3年前
03.Android崩溃Crash库之ExceptionHandler分析
目录总结00.异常处理几个常用api01.UncaughtExceptionHandler02.Java线程处理异常分析03.Android中线程处理异常分析04.为何使用setDefaultUncaughtExceptionHandler前沿上一篇整体介绍了crash崩溃
Wesley13 Wesley13
3年前
Java基础教程——线程池
启动新线程,需要和操作系统进行交互,成本比较高。使用线程池可以提高性能——线程池会提前创建大量的空闲线程,随时待命执行线程任务。在执行完了一个任务之后,线程会回到空闲状态,等待执行下一个任务。(这个任务,就是Runnable的run()方法,或Callable的call()方法)。Java5之前需要手动实现线程池,Java5之
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
Java 基础知识(七)
1.创建线程池1)newCacheThreadPool 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程 2)newFixedThreadPool  创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待 3)newScheduledThreadPool  创建一个定长线程池,支持
Wesley13 Wesley13
3年前
Java并发编程原理与实战二十三:Condition原理分析
先来回顾一下java中的等待/通知机制我们有时会遇到这样的场景:线程A执行到某个点的时候,因为某个条件condition不满足,需要线程A暂停;等到线程B修改了条件condition,使condition满足了线程A的要求时,A再继续执行。自旋实现的等待通知最简单的实现方法就是将condition设为一个volatile的变量