Tiny并行计算框架之复杂示例

Easter79
• 阅读 527

问题来源

非常感谢@doctorwho的问题:

假如职业介绍所来了一批生产汽车的工作,假设生产一辆汽车任务是这样的:搭好底盘、拧4个轮胎、安装发动机、安装4个座椅、再装4个车门、最后安装顶棚。之间有的任务是可以并行计算的(比如拧4个轮胎,安装发动机和安装座椅),有的任务有前置任务(比如先装好座椅,才能装车门和顶棚)。让两组包工头组织两种类型的工作:将工人分成两种类型,即可并行计算的放在同一组内,由职业介绍所来控制A组包工头做完的任务交给B组包工头。中间环节的半成品保存到Warehouse中,是这样使用TINY框架来生产汽车么?

接下来,我就用Tiny并行计算框架来展示一下这个示例,在编写示例的时候,发现了一个BUG,这也充分体现了开源的精神与价值,再次感谢@doctorwho。

问题分析

doctorwho的问题还是比较复杂的,但是实际上道理是一样的,因此我把问题简化成下面的过程

第一步:构建底盘

第二步:并行进行安装引擎,座位和轮胎

第三步:并行进行安装门及车顶

由于我和doctorwho都不是造车行家,因此就不用纠结这么造是不是合理了,假设这么做就是合理的。

代码实现

按我前面说的过程,工人是必须要有的,因此我们首先构建工人:

第一步的底盘构建工人

public class StepFirstWorker extends AbstractWorker {
    public StepFirstWorker() throws RemoteException {
        super("first");
    }

    @Override
    protected Warehouse doWork(Work work) throws RemoteException {
        System.out.println(String.format("%s 构建底盘完成.", work.getInputWarehouse().get("carType")));
        Warehouse outputWarehouse = work.getInputWarehouse();
        outputWarehouse.put("baseInfo", "something about baseInfo");
        return outputWarehouse;
    }
}

由于第二步工人有好几个类型,因此再搞个第二步抽象工人:

public abstract class StepThirdWorker extends AbstractWorker {
    public StepThirdWorker() throws RemoteException {
        super("third");
    }


    protected boolean acceptMyWork(Work work) {
        String workClass = work.getInputWarehouse().get("class");
        if (workClass != null) {
            return true;
        }
        return false;
    }
    protected Warehouse doMyWork(Work work) throws RemoteException {
        System.out.println(String.format("Base:%s ", work.getInputWarehouse().get("baseInfo")));
        System.out.println(String.format("%s is Ok", work.getInputWarehouse().get("class")));
        return work.getInputWarehouse();
    }
}

接下来构建第二步的引擎工人:

public class StepSecondEngineWorker extends StepSecondWorker {


 public static final String ENGINE = "engine";


 public StepSecondEngineWorker() throws RemoteException {
 super();
 }


 public boolean acceptWork(Work work) {
 return acceptMyWork(work);
 }


 protected Warehouse doWork(Work work) throws RemoteException {
 return super.doMyWork(work);
 }
}

第二步的座位工人:

public class StepSecondSeatWorker extends StepSecondWorker {


    public static final String SEAT = "seat";


    public StepSecondSeatWorker() throws RemoteException {
        super();
    }
    public boolean acceptWork(Work work) {
       return acceptMyWork(work);
    }
    protected Warehouse doWork(Work work) throws RemoteException {
        return super.doMyWork(work);
    }
}


  
  
  



  
  
  

第二步的轮胎工人:

public class StepSecondTyreWorker extends StepSecondWorker {
 public static final String TYRE = "tyre";


 public StepSecondTyreWorker() throws RemoteException {
 super();
 }


 public boolean acceptWork(Work work) {
 return acceptMyWork(work);
 }


 protected Warehouse doWork(Work work) throws RemoteException {
 return super.doMyWork(work);
 }
}

同理,第三步也是大同小异的。

第三步的抽象工人类:

public abstract class StepThirdWorker extends AbstractWorker {
    public StepThirdWorker() throws RemoteException {
        super("third");
    }


    protected boolean acceptMyWork(Work work) {
        String workClass = work.getInputWarehouse().get("class");
        if (workClass != null) {
            return true;
        }
        return false;
    }
    protected Warehouse doMyWork(Work work) throws RemoteException {
        System.out.println(String.format("Base:%s ", work.getInputWarehouse().get("baseInfo")));
        System.out.println(String.format("%s is Ok", work.getInputWarehouse().get("class")));
        return work.getInputWarehouse();
    }
}

第三步的车门工人:

public class StepThirdDoorWorker extends StepThirdWorker {


    public static final String DOOR = "door";


    public StepThirdDoorWorker() throws RemoteException {
        super();
    }
    public boolean acceptWork(Work work) {
        return acceptMyWork(work);
    }
    @Override
    protected Warehouse doWork(Work work) throws RemoteException {
        return super.doMyWork(work);
    }
}

第三步的车顶工人:

public class StepThirdRoofWorker extends StepThirdWorker {


    public static final String ROOF = "roof";


    public StepThirdRoofWorker() throws RemoteException {
        super();
    }
    public boolean acceptWork(Work work) {
        return acceptMyWork(work);
    }
    protected Warehouse doWork(Work work) throws RemoteException {
        return super.doMyWork(work);
    }
}

以上就把工人都构建好了,我们前面也说过,如果要进行任务分解,是必须要构建任务分解合并器的,这里简单起见,只实现任务分解了。

第二部的任务分解:

public class SecondWorkSplitter implements WorkSplitter {
    public List<Warehouse> split(Work work, List<Worker> workers) throws RemoteException {
        List<Warehouse> list = new ArrayList<Warehouse>();
        list.add(getWareHouse(work.getInputWarehouse(), "engine"));
        list.add(getWareHouse(work.getInputWarehouse(), "seat"));
        list.add(getWareHouse(work.getInputWarehouse(), "seat"));
        list.add(getWareHouse(work.getInputWarehouse(), "seat"));
        list.add(getWareHouse(work.getInputWarehouse(), "seat"));
        list.add(getWareHouse(work.getInputWarehouse(), "tyre"));
        list.add(getWareHouse(work.getInputWarehouse(), "tyre"));
        list.add(getWareHouse(work.getInputWarehouse(), "tyre"));
        list.add(getWareHouse(work.getInputWarehouse(), "tyre"));
        return list;
    }

    private Warehouse getWareHouse(Warehouse inputWarehouse, String stepClass) {
        Warehouse warehouse = new WarehouseDefault();
        warehouse.put("class", stepClass);
        warehouse.putSubWarehouse(inputWarehouse);
        return warehouse;
    }
}

从上面可以看到,构建了一个引擎的仓库,4个座位仓库,4个轮胎仓库。呵呵,既然能并行,为啥不让他做得更快些?

接下来是第三步的任务分解器:

public class ThirdWorkSplitter implements WorkSplitter {
    public List<Warehouse> split(Work work, List<Worker> workers) throws RemoteException {
        List<Warehouse> list = new ArrayList<Warehouse>();
        list.add(getWareHouse(work.getInputWarehouse(), "door"));
        list.add(getWareHouse(work.getInputWarehouse(), "door"));
        list.add(getWareHouse(work.getInputWarehouse(), "door"));
        list.add(getWareHouse(work.getInputWarehouse(), "door"));
        list.add(getWareHouse(work.getInputWarehouse(), "roof"));
        return list;
    }

    private Warehouse getWareHouse(Warehouse inputWarehouse, String stepClass) {
        Warehouse warehouse = new WarehouseDefault();
        warehouse.put("class", stepClass);
        warehouse.putSubWarehouse(inputWarehouse);
        return warehouse;
    }
}

从上面可以看到,第三部构建了4个门仓库一个车顶仓库,同样的,可以让4个工人同时装门。

上面就把所有的准备工作都做好了,接下来就是测试方法了:

public class Test {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        JobCenter jobCenter = new JobCenterLocal();


        for (int i = 0; i < 5; i++) {
            jobCenter.registerWorker(new StepFirstWorker());
        }
        for (int i = 0; i < 5; i++) {
            jobCenter.registerWorker(new StepSecondTyreWorker());
        }
        for (int i = 0; i < 5; i++) {
            jobCenter.registerWorker(new StepSecondSeatWorker());
        }
        for (int i = 0; i < 5; i++) {
            jobCenter.registerWorker(new StepSecondEngineWorker());
        }
        for (int i = 0; i < 5; i++) {
            jobCenter.registerWorker(new StepThirdDoorWorker());
        }
        for (int i = 0; i < 5; i++) {
            jobCenter.registerWorker(new StepThirdRoofWorker());
        }

        jobCenter.registerForeman(new ForemanSelectOneWorker("first"));
      jobCenter.registerForeman(new ForemanSelectAllWorker("second",
 new SecondWorkSplitter()));
 jobCenter.registerForeman(new ForemanSelectAllWorker("third",new ThirdWorkSplitter()));


 Warehouse inputWarehouse = new WarehouseDefault();
 inputWarehouse.put("class", "car");
 inputWarehouse.put("carType", "普桑");
 WorkDefault work = new WorkDefault("first", inputWarehouse);
 work.setForemanType("first");
 WorkDefault work2 = new WorkDefault("second");
 work2.setForemanType("second");
 WorkDefault work3 = new WorkDefault("third");
 work3.setForemanType("third");
 work.setNextWork(work2).setNextWork(work3);


 Warehouse warehouse = jobCenter.doWork(work);


 jobCenter.stop();

    }
}

呵呵,工人各加了5个,然后注册了三个工头,第一步的工头是随便挑一个工人类型的,第二步和第三步是挑所有工人的,同时还指定了任务分解器。

接下来就构建了一个工作,造一个高端大气上档次的普桑汽车,然后告诉职业介绍所说给我造就可以了。

下面是造车的过程,我把日志也贴上来了:

普桑 构建底盘完成.
-234  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行开始,线程数9...
-234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf>运行开始...
-234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61>运行开始...
-234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b>运行开始...
-235  [id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1>运行开始...
-236  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8>运行开始...
-237  [id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b>运行开始...
Base:something about baseInfo 
engine is Ok
Base:something about baseInfo 
seat is Ok
-245  [id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf>运行结束
-246  [id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177>运行开始...
-246  [id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1>运行结束
Base:something about baseInfo 
seat is Ok
-248  [id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a>运行开始...
Base:something about baseInfo 
tyre is Ok
-250  [id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177>运行结束
-250  [id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61>运行结束
Base:something about baseInfo 
seat is Ok
-252  [id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c>运行开始...
-253  [id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a>运行结束
Base:something about baseInfo 
seat is Ok
Base:something about baseInfo 
tyre is Ok
-257  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b>运行结束
-258  [id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b>运行结束
Base:something about baseInfo 
tyre is Ok
Base:something about baseInfo 
tyre is Ok
-262  [id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c>运行结束
-264  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8>运行结束
-264  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行结束, 用时:30ms
-333  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行开始,线程数5...
-334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26>运行开始...
-334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2>运行开始...
-334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5>运行开始...
-334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07>运行开始...
Base:something about baseInfo 
door is Ok
Base:something about baseInfo 
door is Ok
Base:something about baseInfo 
-338  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26>运行结束
door is Ok
-339  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb>运行开始...
-338  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2>运行结束
Base:something about baseInfo 
door is Ok
Base:something about baseInfo 
-340  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07>运行结束
roof is Ok
-340  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5>运行结束
-342  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb>运行结束
-343  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行结束, 用时:10ms

从上面的日志可以看出:

由于第一步工作是挑一个单干的,因此是没有启用线程组的

第二步同时有9个线程干活:

-234  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行开始,线程数9...
...
-264  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行结束, 用时:30ms

第三步同时有5个线程干活:

-333  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行开始,线程数5...
...
-343  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行结束, 用时:10ms

总结:

Tiny并行计算框架确实是可以方便的解决各种复杂并行计算的问题。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
6
获赞
1.2k