Java多线程模式之流水线模式

Wesley13
• 阅读 1004

流水线

什么是流水线?

在计算机中,对于一条具体的指令执行过程,通常可以分为五个部分:取指令,指令译码,取操作数,运算 (ALU),写结果。 前三步由指令控制器完成,后两步则由运算器完成。 按照传统的方式,所有指令顺序执行,那么先是指令控制器工作,完成第一条指令的前三步,然后运算器工作,完成后两步,第一条指令执行完毕。然后第二条指令又是先指令控制器工作,完成前三步,然后运算器,完成第二条指令的后两部…… 传统方式有个很大的缺点就是,指令只能一条一条地执行,仔细分析一下就会发现,这种方式存在很大的资源浪费:即同一时刻,要么指令控制器工作,运算器闲着;要么运算器工作,指令控制器闲着。这样一方面资源得不到有效利用,另一方面就是工作效率很低。 流水线的出现就是为了解决这个问题,下面我们来看一下流水线的工作模式: 假设有两个指令INS_A和INS_B,它们的执行分别要经过A,B,C,D四个过程,假设A到D四个过程分别由四个硬件元件完成。按照传统的方式,它们的流程如下: Java多线程模式之流水线模式 这种方式的缺点就是,只能一条指令一条指令的执行,并且当指令执行到过程B的时候,处理过程A和CD的元件是处于空闲状态的。

流水线方式如下:
Java多线程模式之流水线模式

说明一下,通过流水线的方式,当INS_A指令执行完过程A之后,处理过程A的元件就空闲了,此时我们就开始处理指令INS_B的A阶段,这样一来,INS_B指令只需要等到INS_A的A过程执行完成就可以继续执行了,这样以来就在很大程度上提高了效率。

流水线中断

使用流水线能够很大程度提高程序执行效率,这点是毋庸置疑的,但是,在系统中,每当引入一个新的模式或者组件的时候,我们就需要对应处理该模式或者组件所带来的问题,那么引入流水线的一个很大的问题就是流水线中断。

产生中断一般是由两个原因造成的,一个是“相关”,一个是“条件转移”。 相关:指的是一条指令的执行需要依赖前一条指令的执行结果。拿上面的例子,假设INS_B指令在执行过程C的时候,需要使用INS_A指令过程D的结果,那么指令INS_B在执行到C的时候,由于A指令的D过程还没有执行完成,所以此时INS_B就不能继续执行,否则就会拿到错误结果。所以此时就需要将流水线中断,从而等待指令INS_A的D过程的结束。如下图所示:
Java多线程模式之流水线模式

条件转移:如果一条指令是条件转移指令,即指令执行结果是根据条件发生变化的,那么系统就不清楚下面应该执行哪一条指令,这时就必须等第一条指令的判断结果出来才能执行第二条指令。条件转移所造成的流水线停顿甚至比相关还要严重的多。

所以:流水线虽然能够提高整体运行效率,但是在某些情况下,需要中断流水线,以保证程序运行正确,而流水线的中断又会降低系统运行效率。

流水线在Java中的应用

目前在java提供的实现中,并没有用到流水线模式,但是我们知道JVM虚拟机在运行时对我们的代码进行的指令重排序,目的就是为了减少流水线的中断,从而提高流水线运行效率。

在实际的项目开发中,我们也可以学习流水线模式的思想,将业务流程拆分成多个子流程,然后采用流水线的方式进行,以减少程序等待。 比如有一个操作涉及到1)查询数据库,2)本地处理数据,3)远程RPC通知结果 三个过程,其中过程1和过程3都涉及到网络IO操作,所以整体运行是比较耗时的。但是我们可以采用流水线作业的方式,这样就可以充分利用三部分的资源。提高系统的整体运行效率。

Commons Pipeline

apache基金会下的一个项目,提供了流水线操作的框架。参考这里 PS:这个框架貌似好久都没有维护过了,不知道为啥。。

自己实现一个简单的流水线

在黄文海先生的《Java多线程实战指南(设计模式篇)》书中,详细讲解了流水线的java实现,另外,可以参考它书中的源码,点击这里

我也简单写了一个,假设有一个任务需要分为三步执行,那么可以将每一步抽象成一个任务阶段(TaskStage),然后每个任务阶段都被提交到队列中,然后三个线程分别处理三个队列中的数据。 TaskStage

package pipeline;

/**
 * 任务阶段,比如一个任务分为三个阶段,每一个阶段都有一个TaskStage与之对应
 */
public interface TaskStage {

    /**
     * 处理任务,返回值为该任务阶段的下一个阶段
     */
    public TaskStage process();
}

三个具体的任务阶段: 数据库查询任务阶段

package pipeline;

/**
 * 查询数据库阶段
 */
public class DBQueryTaskStage implements TaskStage {

    @Override
    public TaskStage process() {
        String result = queryDB();
        System.out.println(result);
        return new CalculateTaskStage(result);
    }

    /**
     * 模拟查询数据库过程
     * 
     * @return
     */
    private String queryDB() {
        try {
            Thread.sleep(1000 * 20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "DBQueryTaskStage";
    }

}

计算处理结果任务阶段

package pipeline;

public class CalculateTaskStage implements TaskStage {
    private String data;

    public CalculateTaskStage(String data) {
        super();
        this.data = data;
    }

    @Override
    public TaskStage process() {
        try {
            Thread.sleep(1000 * 5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String result = calculate(data);
        System.out.println(result);
        return new RpcTaskStage(result);
    }

    private String calculate(String data2) {
        return data2 + ",CaculateTaskStage";
    }

}

远程RPC调用任务阶段

package pipeline;

public class RpcTaskStage implements TaskStage {

    private String data;

    public RpcTaskStage(String data) {
        super();
        this.data = data;
    }

    @Override
    public TaskStage process() {
        try {
            Thread.sleep(1000 * 3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(data + ",RpcTaskStage");
        return null;
    }

}

接下来需要有一个流水线类,用于任务调度

package pipeline;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

/**
 * 整条流水线
 */
public class Pipeline {

    // 用于存放任务的第一阶段
    private BlockingDeque<TaskStage> firstStageQueue = new LinkedBlockingDeque<TaskStage>();

    // 用于存储任务的第二阶段
    private BlockingDeque<TaskStage> secondStageQueue = new LinkedBlockingDeque<TaskStage>();

    // 用于存储任务的第三阶段
    private BlockingDeque<TaskStage> thirdStageQueue = new LinkedBlockingDeque<TaskStage>();

    public Pipeline() {
        super();

        // 启动三个线程,分别处理三个阶段的任务
        new Thread() {

            @Override
            public void run() {
                while (true) {
                    TaskStage taskStage = null;
                    try {
                        taskStage = firstStageQueue.poll(80, TimeUnit.SECONDS);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    if (taskStage != null) {
                        TaskStage nextStage = taskStage.process();
                        try {
                            secondStageQueue.put(nextStage);
                        } catch (InterruptedException e) {
                        }
                    } else { // 取出来的数据为空,则终止(这种情况可能会有问题,这里做简单演示)
                        break;
                    }
                }
            }
        }.start();

        new Thread() {

            @Override
            public void run() {
                while (true) {
                    TaskStage taskStage = null;
                    try {
                        taskStage = secondStageQueue.poll(80, TimeUnit.SECONDS);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    if (taskStage != null) {
                        TaskStage nextStage = taskStage.process();
                        try {
                            thirdStageQueue.put(nextStage);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        break;
                    }
                }
            }
        }.start();

        new Thread() {

            @Override
            public void run() {
                while (true) {
                    TaskStage taskStage = null;
                    try {
                        taskStage = thirdStageQueue.poll(80, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (taskStage != null) {
                        taskStage.process();
                    } else {
                        break;
                    }
                }
            }
        }.start();
    }

    public void process(TaskStage firstStage) {
        try {
            firstStageQueue.put(firstStage);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

主程序入口:

package pipeline;

import java.util.concurrent.locks.LockSupport;

public class Main {
    public static void main(String[] args) {
        Pipeline line = new Pipeline();

        for (int i = 0; i < 10; i++) {
            line.process(new DBQueryTaskStage());
        }

        LockSupport.park();
    }
}

PS:上面的程序只是简单演示,会有一些线程调度的问题。

参考资料

  1. 流水线
  2. 《Java多线程实战指南(设计模式篇)》黄文海
点赞
收藏
评论区
推荐文章
桃浪十七丶 桃浪十七丶
3年前
计算机组成原理4.2指令寻址方式
4.2.1指令寻址和数据寻址4.2.1.1指令寻址:顺序寻址:取出指令后PC指向下一条需执行指令的地址。1)顺序寻址:定长指令字结构寻址:假设指令字长存储字长16bit2Byte,且主存按字编址。则每次取出指令后PC1;若主存按字节编址,意味着每条指令都会占两个地址,则每次取出指令后PC2。变长指令字结构寻址:指令字长存储字
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
桃浪十七丶 桃浪十七丶
3年前
计算机组成原理4.1指令格式
4.1.1指令的基本概念和指令的基本格式操作码:指明CPU进行什么操作。地址码:知指明CPU对谁进行操作。PC:程序计数器,每执行一条指令会1指向下一条指令。指令的概念和基本格式:是指一台计算机执行某种操作的命令,一台计算机的所有指令的集合构成指令集,也叫做指令系统,位于计算机的硬件和OS层面。不同计算机只能执行自己系统的指令,如Intel的x86架构,手
Wesley13 Wesley13
3年前
PIC中档单片机汇编指令详解(5)
位操作指令详述BCF数据寄存器指定位清0语法形式:BCFf,b操作数:f为数据寄存器的低7位地址(0x00~0x7F)B为数据位编号(0~7)执行时间:一个指令周期执行过程:使数据寄存器f的的b位清0状态标志影响:无说明:该指令可对任何数据寄存器的任意一个位置清0,常用于标志位的设定和清除,或者把某一管脚置成低电平。指
Stella981 Stella981
3年前
Dockerfile指令详解
Dockerfile中包括FROM、MAINTAINER、RUN、CMD、EXPOSE、ENV、ADD、COPY、ENTRYPOINT、VOLUME、USER、WORKDIR、ONBUILD等13个指令。下面一一讲解。1.FROM格式为FROMimage或FROMimage:tag,并且Dockerfile中第一条指令必须是FROM指令
Wesley13 Wesley13
3年前
Java虚拟机(一):JVM简介
JVM简介Java虚拟机(JVM)是由Java虚拟机规范定义的,其上运行的是字节码指令集。这种字节码指令集包含一个字节的操作码(opcode),零至多个操作数(oprand),虚拟机规范明确定义了每种字节码指令完成的功能是什么以及需要多少个操作数。Java虚拟机上运行的class文件,这个文件中包含字节码指令流以及类定义的信息,所以Java虚
Wesley13 Wesley13
3年前
CPU知识点一览
关于CPU和程序的执行1、程序的运行过程,实际上是程序涉及到的、未涉及到的一大堆的指令的执行过程。当程序要执行的部分被装载到内存后,CPU要从内存中取出指令,然后指令解码(以便知道类型和操作数,简单的理解为CPU要知道这是什么指令),然后执行该指令。再然后取下一个指令、解码、执行,以此类推直到程序退出。2、这个取指、
Stella981 Stella981
3年前
JVM中即时编译器JIT与解释器并存
一.学习目标1.了解解释器与编译器的概念与作用。2.知道jvm中三种执行模式。3.了解热点代码。二.解释器模式与编译器模式以及混合模式  字节码文件通过类装载器装载,被分配被分配到JVM的运行时数据区,然后会被执行引擎执行。执行引擎以指令为单位读取Java字节码。它就像一个CPU一样,一条一条地执行机器指令。每个字节码指令
Stella981 Stella981
3年前
Dockerfile指令:
Dockerfile指令:第一行注释,指令是大写字母开头,FROM指令:FROM<image,后面跟镜像名,FROM<image:<tag,后面跟镜像名和标签名,必须是已经存在的镜像,后续指令都是基于这个镜像来执行的,这个镜像也叫基础镜像,必须是第一条非注释指令,FROMubuntu:14.0
Stella981 Stella981
3年前
JVM基础命令
介绍java虚拟机的指令功能,至少能阅读java代码生成的字节码指令含义一、概述Java虚拟机采用基于栈的架构,其指令由操作码和操作数组成。操作码:一个字节长度(0~255),意味着指令集的操作码个数不能操作256条。操作数:一条指令可以有零或者多个操作数,且操作数可以是1个或者多个字节。编译后的代码没有采用操作数