流水线
什么是流水线?
在计算机中,对于一条具体的指令执行过程,通常可以分为五个部分:取指令,指令译码,取操作数,运算 (ALU),写结果。 前三步由指令控制器完成,后两步则由运算器完成。 按照传统的方式,所有指令顺序执行,那么先是指令控制器工作,完成第一条指令的前三步,然后运算器工作,完成后两步,第一条指令执行完毕。然后第二条指令又是先指令控制器工作,完成前三步,然后运算器,完成第二条指令的后两部…… 传统方式有个很大的缺点就是,指令只能一条一条地执行,仔细分析一下就会发现,这种方式存在很大的资源浪费:即同一时刻,要么指令控制器工作,运算器闲着;要么运算器工作,指令控制器闲着。这样一方面资源得不到有效利用,另一方面就是工作效率很低。 流水线的出现就是为了解决这个问题,下面我们来看一下流水线的工作模式: 假设有两个指令INS_A和INS_B,它们的执行分别要经过A,B,C,D四个过程,假设A到D四个过程分别由四个硬件元件完成。按照传统的方式,它们的流程如下: 这种方式的缺点就是,只能一条指令一条指令的执行,并且当指令执行到过程B的时候,处理过程A和CD的元件是处于空闲状态的。
流水线方式如下:
说明一下,通过流水线的方式,当INS_A指令执行完过程A之后,处理过程A的元件就空闲了,此时我们就开始处理指令INS_B的A阶段,这样一来,INS_B指令只需要等到INS_A的A过程执行完成就可以继续执行了,这样以来就在很大程度上提高了效率。
流水线中断
使用流水线能够很大程度提高程序执行效率,这点是毋庸置疑的,但是,在系统中,每当引入一个新的模式或者组件的时候,我们就需要对应处理该模式或者组件所带来的问题,那么引入流水线的一个很大的问题就是流水线中断。
产生中断一般是由两个原因造成的,一个是“相关”,一个是“条件转移”。 相关:指的是一条指令的执行需要依赖前一条指令的执行结果。拿上面的例子,假设INS_B指令在执行过程C的时候,需要使用INS_A指令过程D的结果,那么指令INS_B在执行到C的时候,由于A指令的D过程还没有执行完成,所以此时INS_B就不能继续执行,否则就会拿到错误结果。所以此时就需要将流水线中断,从而等待指令INS_A的D过程的结束。如下图所示:
条件转移:如果一条指令是条件转移指令,即指令执行结果是根据条件发生变化的,那么系统就不清楚下面应该执行哪一条指令,这时就必须等第一条指令的判断结果出来才能执行第二条指令。条件转移所造成的流水线停顿甚至比相关还要严重的多。
所以:流水线虽然能够提高整体运行效率,但是在某些情况下,需要中断流水线,以保证程序运行正确,而流水线的中断又会降低系统运行效率。
流水线在Java中的应用
目前在java提供的实现中,并没有用到流水线模式,但是我们知道JVM虚拟机在运行时对我们的代码进行的指令重排序,目的就是为了减少流水线的中断,从而提高流水线运行效率。
在实际的项目开发中,我们也可以学习流水线模式的思想,将业务流程拆分成多个子流程,然后采用流水线的方式进行,以减少程序等待。 比如有一个操作涉及到1)查询数据库,2)本地处理数据,3)远程RPC通知结果 三个过程,其中过程1和过程3都涉及到网络IO操作,所以整体运行是比较耗时的。但是我们可以采用流水线作业的方式,这样就可以充分利用三部分的资源。提高系统的整体运行效率。
Commons Pipeline
apache基金会下的一个项目,提供了流水线操作的框架。参考这里 PS:这个框架貌似好久都没有维护过了,不知道为啥。。
自己实现一个简单的流水线
在黄文海先生的《Java多线程实战指南(设计模式篇)》书中,详细讲解了流水线的java实现,另外,可以参考它书中的源码,点击这里
我也简单写了一个,假设有一个任务需要分为三步执行,那么可以将每一步抽象成一个任务阶段(TaskStage),然后每个任务阶段都被提交到队列中,然后三个线程分别处理三个队列中的数据。 TaskStage
package pipeline;
/**
* 任务阶段,比如一个任务分为三个阶段,每一个阶段都有一个TaskStage与之对应
*/
public interface TaskStage {
/**
* 处理任务,返回值为该任务阶段的下一个阶段
*/
public TaskStage process();
}
三个具体的任务阶段: 数据库查询任务阶段
package pipeline;
/**
* 查询数据库阶段
*/
public class DBQueryTaskStage implements TaskStage {
@Override
public TaskStage process() {
String result = queryDB();
System.out.println(result);
return new CalculateTaskStage(result);
}
/**
* 模拟查询数据库过程
*
* @return
*/
private String queryDB() {
try {
Thread.sleep(1000 * 20);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "DBQueryTaskStage";
}
}
计算处理结果任务阶段
package pipeline;
public class CalculateTaskStage implements TaskStage {
private String data;
public CalculateTaskStage(String data) {
super();
this.data = data;
}
@Override
public TaskStage process() {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = calculate(data);
System.out.println(result);
return new RpcTaskStage(result);
}
private String calculate(String data2) {
return data2 + ",CaculateTaskStage";
}
}
远程RPC调用任务阶段
package pipeline;
public class RpcTaskStage implements TaskStage {
private String data;
public RpcTaskStage(String data) {
super();
this.data = data;
}
@Override
public TaskStage process() {
try {
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(data + ",RpcTaskStage");
return null;
}
}
接下来需要有一个流水线类,用于任务调度
package pipeline;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* 整条流水线
*/
public class Pipeline {
// 用于存放任务的第一阶段
private BlockingDeque<TaskStage> firstStageQueue = new LinkedBlockingDeque<TaskStage>();
// 用于存储任务的第二阶段
private BlockingDeque<TaskStage> secondStageQueue = new LinkedBlockingDeque<TaskStage>();
// 用于存储任务的第三阶段
private BlockingDeque<TaskStage> thirdStageQueue = new LinkedBlockingDeque<TaskStage>();
public Pipeline() {
super();
// 启动三个线程,分别处理三个阶段的任务
new Thread() {
@Override
public void run() {
while (true) {
TaskStage taskStage = null;
try {
taskStage = firstStageQueue.poll(80, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
if (taskStage != null) {
TaskStage nextStage = taskStage.process();
try {
secondStageQueue.put(nextStage);
} catch (InterruptedException e) {
}
} else { // 取出来的数据为空,则终止(这种情况可能会有问题,这里做简单演示)
break;
}
}
}
}.start();
new Thread() {
@Override
public void run() {
while (true) {
TaskStage taskStage = null;
try {
taskStage = secondStageQueue.poll(80, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
if (taskStage != null) {
TaskStage nextStage = taskStage.process();
try {
thirdStageQueue.put(nextStage);
} catch (InterruptedException e) {
}
} else {
break;
}
}
}
}.start();
new Thread() {
@Override
public void run() {
while (true) {
TaskStage taskStage = null;
try {
taskStage = thirdStageQueue.poll(80, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (taskStage != null) {
taskStage.process();
} else {
break;
}
}
}
}.start();
}
public void process(TaskStage firstStage) {
try {
firstStageQueue.put(firstStage);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
主程序入口:
package pipeline;
import java.util.concurrent.locks.LockSupport;
public class Main {
public static void main(String[] args) {
Pipeline line = new Pipeline();
for (int i = 0; i < 10; i++) {
line.process(new DBQueryTaskStage());
}
LockSupport.park();
}
}
PS:上面的程序只是简单演示,会有一些线程调度的问题。
参考资料
- 流水线
- 《Java多线程实战指南(设计模式篇)》黄文海