- ThreadFactory
ThreadFactory是一个接口,它只有一个方法,用来创建线程:
Thread newThread(Runnable r);
自定义线程池,可以跟踪线程池究竟何时创建了多少线程,也可以自定义线程的名称,组以及优先级等信息,甚至可以任性的将所有的线程设置为守护线程。总之,使用自定义线程池可以让我们更加自由的设置池子中所有的线程状态
package com.thread.t02;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Test04 {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
System.out.println("create "+ t);
return t;
}
});
for(int i=0;i<5;i++){
es.submit(task);
}
Thread.sleep(2000);
}
}
- 扩展线程池
ThreadPoolExecutor是一个可以扩展的线程池。它提供了beforeExecute(),afterExecute(),terminated()三个接口对线程池进行控制。
package com.thread.t02;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Test05 {
public static class MyTask implements Runnable{
public String name;
public MyTask(String name) {
super();
this.name = name;
}
@Override
public void run() {
System.out.println("正在执行"+":Thread ID:"+Thread.currentThread().getId()+",Task Name="+name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:"+((MyTask)r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// TODO Auto-generated method stub
System.out.println("准备执行:"+((MyTask)r).name);
}
@Override
protected void terminated() {
// TODO Auto-generated method stub
System.out.println("线程池退出");
}
};
for(int i=0;i<5;i++){
MyTask task = new MyTask("Task-GEMY-"+i);
es.execute(task);
Thread.sleep(10);
}
es.shutdown();
}
}
- 优化线程池线程数量
Ncpu=CPU的数量
Ucpu=目标CPU的使用率,0<=Ucpu<=1
W/C = 等待时间与计算时间的比率
为保持处理器达到期望的使用率,最优的池的大小等于:
Ntheads=Ncpu*Ucpu*(1+W/C)
在java中,可以通过
Runtime.getRuntime().availableProcessors()
获得可用的CPU的数量。
在线程池中寻找堆栈
package com.thread.t02;
import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
public class Test06 {
public static class DivTask implements Runnable{ int a,b; public DivTask(int a, int b) { super(); this.a = a; this.b = b; } @Override public void run() { double re = a/b; System.out.println(re); } } public static void main(String[] args) throws InterruptedException, ExecutionException { ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for(int i=0;i<5;i++){
// pools.submit(new DivTask(100,i));//用execute代替submit,可以得到堆栈信息,submit不会打印堆栈信息 pools.execute(new DivTask(100,i)); } } }
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero at com.thread.t02.Test06$DivTask.run(Test06.java:22) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 100.0 50.0 25.0 33.0
我们可以从这个异常堆栈中只能知道异常时在哪里抛出的。但是我们希望得到另外一个更重要的信息,那就是这个任务到底在哪里提交的?
package com.thread.t02;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TraceThreadPoolExecutor extends ThreadPoolExecutor{
public static class DivTask implements Runnable{
int a,b;
public DivTask(int a, int b) {
super();
this.a = a;
this.b = b;
}
@Override
public void run() {
double re = a/b;
System.out.println(re);
}
}
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
private Runnable wrap(final Runnable task ,final Exception clientStack,String clientThreadName){
return new Runnable() {
@Override
public void run() {
try{
task.run();
}catch (Exception e) {
clientStack.printStackTrace();
throw e;
}
}
};
}
@Override
public void execute(Runnable task) {
super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private Exception clientTrace(){
return new Exception("Client stack trace");
}
public static void main(String[] args) {
ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
for(int i=0;i<5;i++){
pools.execute(new DivTask(100, i));
}
}
}
现在,我们不仅可以得到异常发生的Runnable实现内的信息,也知道这个任务是在哪里提价的。
java.lang.Exception: Client stack trace100.0
33.0
at com.thread.t02.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:50)
at com.thread.t02.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:46)
at com.thread.t02.TraceThreadPoolExecutor.main(TraceThreadPoolExecutor.java:58)
50.0
Exception in thread "pool-1-thread-1" 25.0java.lang.ArithmeticException: / by zero
at com.thread.t02.TraceThreadPoolExecutor$DivTask.run(TraceThreadPoolExecutor.java:20)
at com.thread.t02.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)