Runnable
runnable是线程实现的一种方式(接口实现),它只有一个run()函数,用于将耗时操作写在其中,该函数没有返回值。然后使用某个线程去执行该runnable的实现运行多线程程序,Thread类在调用start()函数后就是执行的是runnable的run()函数。runnable的声明如下:
public interface Runnable {
/**
* When an object implementing interface
Runnable
is used* to create a thread, starting the thread causes the object's
*
run
method to be called in that separately executing thread.* @see java.lang.Thread#run()
*/
public abstract void run();
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
通过实现Runnable接口来创建Thread线程:
步骤1:创建实现Runnable接口的类:
class SomeRunnable implements Runnable
{
public void run()
{
//do something here
}
}步骤2:创建一个类对象:
Runnable oneRunnable = new SomeRunnable();
步骤3:由Runnable创建一个Thread对象:
Thread oneThread = new Thread(oneRunnable);
步骤4:启动线程:
oneThread.start();
至此,一个线程就创建完成了。
注释:线程的执行流程很简单,当执行代码oneThread.start();时,就会执行oneRunnable对象中的void run();方法,
该方法执行完成后,线程就消亡了。
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Callable
Callable与Runnable的功能大致相似,Callable中有一个call()函数,但是call()函数有返回值,而Runnable的run()函数不能将结果返回给客户程序。Callable的声明如下 :
public interface Callable
{
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
通过实现Callable接口来创建Thread线程:其中,Callable接口(也只有一个方法)定义如下:
public interface Callable
{
V call() throws Exception;
}
步骤1:创建实现Callable接口的类SomeCallable(略); 步骤2:创建一个类对象:
Callable
oneCallable = new SomeCallable (); 步骤3:由Callable
创建一个FutureTask 对象: FutureTask
oneTask = new FutureTask (oneCallable); 注释:FutureTask
是一个包装器,它通过接受Callable 来创建,它同时实现了Future和Runnable接口。
步骤4:由FutureTask创建一个Thread对象: Thread oneThread = new Thread(oneTask);
步骤5:启动线程:
oneThread.start();
至此,一个线程就创建完成了。
Future
Executor就是Runnable和Callable的调度容器,Future就是对于具体的Runnable或者Callable任务的执行结果进行
取消、****查询是否完成、获取结果、设置结果操作。get方法会阻塞,直到任务返回结果(Future简介)。
Future类位于java.util.concurrent包下,它是一个接口:
public
interface
Future<V> {
boolean
cancel(``boolean
mayInterruptIfRunning);
boolean
isCancelled();
boolean
isDone();
V get()
throws
InterruptedException, ExecutionException;
V get(``long
timeout, TimeUnit unit)
throws
InterruptedException, ExecutionException, TimeoutException;
}
在Future接口中声明了5个方法,下面依次解释每个方法的作用:
cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数 mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若 mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论 mayInterruptIfRunning为true还是false,肯定返回true。
isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
isDone方法表示任务是否已经完成,若任务完成,则返回true;
get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
也就是说Future提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
Future使用示例:
package com.test.current;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class FutureTest {
public static void main(String[] args) {
//指定目录和关键字
String directory = "C:\\Program Files\\Java\\jdk1.6.0_29";
String keyword = "volatile";
//实现Callable接口的线程
MatchCounter counter = new MatchCounter(new File(directory),keyword);
FutureTask
task = new FutureTask (counter); Thread t = new Thread(task);
t.start();
try{
System.out.println(task.get() + "matching files");
}catch (Exception e) {
e.printStackTrace();
}
}
}
//这个线程用户向队列中添加文件
class MatchCounter implements Callable
{ private File directory;
private String keyword;
private int count;
public static File DUMMY = new File("");//这个文件作为一个结束标记
public MatchCounter(File directory,String keyword){
this.directory = directory;
this.keyword = keyword;
}
public Integer call(){
count = 0;
try {
File[] files = directory.listFiles();
ArrayList<Future
> results = new ArrayList<Future >(); for (File file : files) {
if(file.isDirectory()){
MatchCounter counter = new MatchCounter(file, keyword);
FutureTask
task = new FutureTask (counter); results.add(task);
Thread t = new Thread(task);
t.start();
}else{
if(search(file)){
count++;
}
}
}
for(Future
result:results){ try{
count += result.get();
}catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
return count;
}
public boolean search(File file){
try{
Scanner in = new Scanner(new FileInputStream(file));
boolean found = false;
while(!found && in.hasNextLine()){
String line = in.nextLine();
if(line.contains(keyword)){
found = true;
}
}
in.close();
return found;
}catch (IOException e) {
return false;
}
}
}
Futrue模式有个重大缺陷:当消费者工作得不够快的时候,它会阻塞住生产者线程,从而可能导致系统吞吐量的下降。所以不建议在高性能的服务端使用。
FutureTask
FutureTask则是一个RunnableFuture
public class FutureTask
implements RunnableFuture RunnableFuture
public interface RunnableFuture
extends Runnable, Future { /**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
另外它还可以包装Runnable和Callable
, 由构造函数注入依赖。 public FutureTask(Callable
callable) { if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可以看到,Runnable注入会被Executors.callable()函数转换为Callable类型,即FutureTask最终都是执行Callable类型的任务。该适配函数的实现如下 :
public static
Callable callable(Runnable task, T result) { if (task == null)
throw new NullPointerException();
return new RunnableAdapter
(task, result); }
RunnableAdapter适配器
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter
implements Callable { final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
由于FutureTask实现了Runnable,因此它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行。
并且还可以直接通过get()函数获取执行结果,该函数会阻塞,直到结果返回。因此FutureTask既是Future、
Runnable,****又是包装了Callable( 如果是Runnable最终也会被转换为Callable ), 它是这两者的合体。
简单示例
package com.effective.java.concurrent.task;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
/**
*
* @author mrsimple
*
*/
public class RunnableFutureTask {
/**
* ExecutorService
*/
static ExecutorService mExecutor = Executors.newSingleThreadExecutor();
/**
*
* @param args
*/
public static void main(String[] args) {
runnableDemo();
futureDemo();
}
/**
* runnable, 无返回值
*/
static void runnableDemo() {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("runnable demo : " + fibc(20));
}
}).start();
}
/**
* 其中Runnable实现的是void run()方法,无返回值;Callable实现的是 V
* call()方法,并且可以返回执行结果。其中Runnable可以提交给Thread来包装下
* ,直接启动一个线程来执行,而Callable则一般都是提交给ExecuteService来执行。
*/
static void futureDemo() {
try {
/**
* 提交runnable则没有返回值, future没有数据
*/
Future<?> result = mExecutor.submit(new Runnable() {
@Override
public void run() {
fibc(20);
}
});
System.out.println("future result from runnable : " + result.get());
/**
* 提交Callable, 有返回值, future中能够获取返回值
*/
Future
result2 = mExecutor.submit(new Callable () { @Override
public Integer call() throws Exception {
return fibc(20);
}
});
System.out
.println("future result from callable : " + result2.get());
/**
* FutureTask则是一个RunnableFuture
,即实现了Runnbale又实现了Futrue 这两个接口, * 另外它还可以包装Runnable(实际上会转换为Callable)和Callable
*
,所以一般来讲是一个符合体了,它可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行 * ,并且还可以通过v get()返回执行结果,在线程体没有执行完成的时候,主线程一直阻塞等待,执行完则直接返回结果。
*/
FutureTask
futureTask = new FutureTask ( new Callable
() { @Override
public Integer call() throws Exception {
return fibc(20);
}
});
// 提交futureTask
mExecutor.submit(futureTask) ;
System.out.println("future result from futureTask : "
+ futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
/**
* 效率底下的斐波那契数列, 耗时的操作
*
* @param num
* @return
*/
static int fibc(int num) {
if (num == 0) {
return 0;
}
if (num == 1) {
return 1;
}
return fibc(num - 1) + fibc(num - 2);
}
}