接着上一篇博客的 一Java线程的等待/通知模型 ,没有看过的建议先看一下。下面我们用等待通知机制来实现一个线程池.
本文的代码放到了github上,地址如下: git@github.com:jiulu313/ThreadPool.git
线程的任务就以打印一行文本来模拟耗时的任务。主要代码如下:
1 定义一个任务的接口。
/* 任务的接口 3 */
public interface Task {
void doSomething();
}
2 实现一个具体的任务。
/*
* 具体的任务
*/
public class PrintTask implements Task{
//打印一句话,睡一秒,来模拟耗时的任务
@Override
public void doSomething() {
System.out.println("任务:"+Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3 实现工作线程
/*
* 工作者线程
*/
public class Worker implements Runnable {
//线程是否正在运行
private boolean running = true;
//保存Thread,方便start()
private Thread thread;
//保存线程池的任务队列,作同步用
private LinkedList<Task> tasks;
public void setThread(Thread thread) {
this.thread = thread;
}
public void setTasks(LinkedList<Task> tasks) {
this.tasks = tasks;
}
//启动此工作线程
public void start() {
if (thread != null) {
thread.start();
}
}
// 关闭此工作线程
public void shutDown() {
running = false;
thread.interrupt();
}
@Override
public void run() {
while (running) {
Task task = null;
//对共享变量加锁,此处为任务队列,因为会有多个线程访问
synchronized (tasks) {
//当条件不满足时,线程等待,见上一篇博文
while (tasks.isEmpty()) {
try {
//线程进入等待状态,并且释放锁
tasks.wait();
} catch (InterruptedException e) {
//感知到外部对此线程的中断操作
Thread.currentThread().interrupt();
return;
}
}
//条件满足
task = tasks.removeFirst();
}
//执行任务
if (task != null) {
task.doSomething();
}
}
}
}
4 创建一个线程池
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
public class DefaultThreadPool implements ThreadPool {
private int maxWorksNum = 10;
private int minWorksNum = 1;
private int defaultWorksNum = 5;
// 任务列表
private LinkedList<Task> tasks = new LinkedList<>();
// 工作线程列表
private LinkedList<Worker> workers = new LinkedList<>();
//工作线程个数
private int workerNum = defaultWorksNum;
@Override
public void excute(Task task) {
// 添加一个工作,然后进行通知
if (task != null) {
synchronized (tasks) {
//添加到最后一个位置
tasks.addLast(task);
//通知等待的线程,有新的任务了
tasks.notify();
}
}
}
// 关闭线程池
@Override
public void shutDown() {
for (Worker worker : workers) {
worker.shutDown();
}
}
// 初始化工作者线程
public void initWorkers(int num) {
if (num > maxWorksNum) {
num = maxWorksNum;
} else if (num < minWorksNum) {
num = minWorksNum;
} else {
num = defaultWorksNum;
}
for (int i = 0; i < workerNum; i++) {
//创建工作线程
Worker worker = new Worker();
//添加到工作队列
workers.add(worker);
//新建一个线程对象,并将worker赋值
Thread thread = new Thread(worker);
//设置线程对象,作启动,中断用
worker.setThread(thread);
//设置任务队列,作同步用
worker.setTasks(tasks);
}
}
// 启动线程池
public void start(){
if(workers != null){
for(Worker worker : workers){
//启动一个工作线程
worker.start();
}
}
}
// 新增加工作线程,但是不能大于线程池最大线程数
@Override
public void addWorkers(int num) {
if (num <= 0) {
return;
}
int remain = maxWorksNum - workerNum;
if (num > remain) {
num = remain;
}
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker);
thread.start();
}
workerNum = workers.size();
}
// 减少工作线程,至少留1个,不能减少到0
@Override
public void removeWorkers(int num) {
if(num >= workerNum || num <= 0){
return;
}
for(int i =0;i<num;i++){
Worker worker = workers.getLast();
worker.shutDown();
}
workerNum = workers.size();
}
@Override
public int getTaskSize() {
return tasks.size();
}
}
5 新建测试类
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
//1 新建一个线程池
DefaultThreadPool pool = new DefaultThreadPool();
//2 设置线程池的大小为5
pool.initWorkers(5);
//3 启动线程池
pool.start();
//4 往任务队列里面添加任务
for(int i = 0;i<100;i++){
pool.excute(new PrintTask());
}
}
}
在eclipse中运行,结果部分截图如下:
好了,一个线程池就这样,这只是一个小例子,提示线程池的原理
其实实现工作中,一个线程池要考虑的问题远比这个多,也更复杂。
其中比较重要的两个方面:
1 线程池开几个线程为最合适?
我们知道,线程不是开的越多越好,而要根据业务的场景,硬件的指标,带宽的大小等等
一般线程池的个数为CPU核心数的个数加1 ,google的建议。此外可能还要具体分析业务
大,中,小的业务需求,也是不一样的。
大任务:比如下载一部电影,可能要十几分钟甚至几十分钟的任务
中任务:比如下载一幅图片,有1M以上了到十几M的大小的。
小任务:比如下载的是游戏的ico,就十几K的到1M以下的。
小任务可以多开几个线程。
中任务的可以保守点。
大任务的尽量不要开的线程太多
具体值还需要看具体业务,具体场景。这些只是建议。
2 线程用哪种队列,也是和上面有关系。
今天就到这了,后续还会抽时间研究线程并发这块,希望对大家有帮忙。