Java并发新构件之CounDownLatch

Wesley13
• 阅读 664

    CountDownLatch主要用于同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。

    你可以向CountDownLatch对象设置一个初始计数值,任何在这个对象上调用await()的方法都将阻塞,直到这个计数值达到0。其他任务在结束其工作时,可以在该对象上调用countDown()来减小这个计数值,你可以通过调用getCount()方法来获取当前的计数值。CountDownLatch被设计为只触发一次,计数值不能被重置。如果你需要能够重置计数值的版本,则可以使用CyclicBarrier。

    调用countDown()的任务在产生这个调用时并没有阻塞,只有对await()的调用会被阻塞,直到计数值到达0。

    CountDownLatch的典型用法是将一个程序分为n个互相独立的可解决任务,并创建值为0的CountDownLatch。当每个任务完成时,都会在这个锁存器上调用countDown()。等待问题被解决的任务在这个锁存器上调用await(),将它们自己锁住,直到锁存器计数结束。下面是演示这种技术的一个框架示例:

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class TaskPortion implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private static Random random = new Random();
    private final CountDownLatch latch;
    public TaskPortion(CountDownLatch latch) {
        this.latch = latch;
    }
    @Override
    public void run() {
        try {
            doWork();
            latch.countDown();//普通任务执行完后,调用countDown()方法,减少count的值
            System.out.println(this + " completed. count=" + latch.getCount());
        } catch (InterruptedException e) {
            
        }
    }
    
    public void doWork() throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
    }
    
    @Override
    public String toString() {
        return String.format("%1$-2d ", id);
    }
}

class WaitingTask implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch latch;
    public WaitingTask(CountDownLatch latch) {
        this.latch = latch;
    }
    @Override
    public void run() {
        try {
            //这些后续任务需要等到之前的任务都执行完成后才能执行,即count=0时
            latch.await();
            System.out.println("Latch barrier passed for " + this);
        } catch (InterruptedException e) {
            System.out.println(this + " interrupted.");
        }
    }
    
    @Override
    public String toString() {
        return String.format("WaitingTask %1$-2d ", id);
    }
}

public class CountDownLatchDemo {
    static final int SIZE = 10;
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(SIZE);
        ExecutorService exec = Executors.newCachedThreadPool();
        //5个WaitingTask
        for (int i = 0; i < 5; i++) {
            exec.execute(new WaitingTask(latch));
        }
        //10个任务,这10个任务要先执行才会执行WaitingTask
        for (int i = 0; i < SIZE; i++) {
            exec.execute(new TaskPortion(latch));
        }
        System.out.println("Launched all tasks.");
        exec.shutdown();//当所有的任务都结束时,关闭exec
    }
}

执行结果(可能的结果):

Launched all tasks.
4   completed. count=9
6   completed. count=8
3   completed. count=7
0   completed. count=6
2   completed. count=5
1   completed. count=4
5   completed. count=3
7   completed. count=2
9   completed. count=1
8   completed. count=0
Latch barrier passed for WaitingTask 0  
Latch barrier passed for WaitingTask 2  
Latch barrier passed for WaitingTask 1  
Latch barrier passed for WaitingTask 3  
Latch barrier passed for WaitingTask 4

    从结果中可以看到,所有的WaitingTask都是在所有的TaskPortion执行完成之后执行的。

    TaskPortion将随机的休眠一段时间,以模拟这部分工作的完成。而WaitingTask表示系统中必须等待的部分,它要等到问题的初始部分完成后才能执行。注意:所有任务都使用了在main()中定义的同一个CountDownLatch对象

一个更实际的例子(参考自这里,有改动):N个选手比赛跑步,在枪响后同时起跑,全部到达终点后比赛结束,下面是代码:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 用CountDownLatch来模拟运动员赛跑的Demmo.
 */
public class CountDownLatchDemo {
    /** 参与比赛的人数(并发线程数) */
    private static int PLAYER_NUM = 8;
    
    public static void main(String[] args) throws Exception {
        //用于让主线程(裁判)等待所有子线程(运动员)就绪
        final CountDownLatch readySignal = new CountDownLatch(PLAYER_NUM);
        //用于让子线程(运动员)等待主线程(裁判)发号施令
        final CountDownLatch beginSignal = new CountDownLatch(1);
        //用于让主线程(裁判)等待所有子线程(运动员)执行(比赛)完成
        final CountDownLatch endSignal = new CountDownLatch(PLAYER_NUM);
        ExecutorService executorService = Executors.newFixedThreadPool(PLAYER_NUM);
        
        for(int i = 0; i < PLAYER_NUM; i++){
            final int num = i + 1;
            Runnable player = new Runnable(){
                @Override
                public void run() {
                    System.out.println("No." + num + " is ready");
                    //一个任务就绪后,减少readySignal上的等待
                    readySignal.countDown();
                    try {
                        //等待主线程比赛开始的命令
                        beginSignal.await();
                        System.out.println("No." + num + " is running");
                        Thread.sleep((long) (Math.random() * 5000));
                        System.out.println("No.-----" + num + " arrived");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        //每个线程执行完成后,调用countDown减少在endSignal上的等待线程数
                        endSignal.countDown();
                    }
                }
            };
            executorService.execute(player);
        }
        
        //这里让主线程等待,确保所有子线程已开始执行并调用了await进入等待状态
        readySignal.await();
        System.out.println("Game Start!");
        //所有等待在beginSignal上的线程(运动员)开始执行(比赛)
        beginSignal.countDown();
        try {
            //等待所有在endSignal上的线程(运动员)执行(比赛)完成
            endSignal.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("Game Over!");
            executorService.shutdown();
        }
    }
}

代码中用到了3个CountDownLatch对象,具体作用已在代码中说明,这里不赘述。

说一下执行过程:

  1. 主线程(main方法)启动后,向ExecutorService中提交了8个任务(子线程),每个任务开始执行后都在等待主线程说“比赛开始”。
  2. 主线程等待线程池中的任务全部都处于“ready”状态,然后鸣枪,比赛开始。
  3. 主线程等待所有任务执行完成。
  4. 子线程各自执行,然后陆续到达终点,完成比赛。
  5. 主线程宣告“比赛结束”,输出“Game Over!”。

下面是执行结果:

No.1 is ready
No.2 is ready
No.3 is ready
No.4 is ready
No.6 is ready
No.5 is ready
No.7 is ready
No.8 is ready
Game Start!
No.1 is running
No.4 is running
No.3 is running
No.6 is running
No.2 is running
No.5 is running
No.7 is running
No.8 is running
No.-----3 arrived
No.-----2 arrived
No.-----1 arrived
No.-----8 arrived
No.-----4 arrived
No.-----6 arrived
No.-----7 arrived
No.-----5 arrived
Game Over!
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
java并发编程之二
CountDownLatch类  允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。  CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在
Wesley13 Wesley13
3年前
Java多线程并发控制工具CountDownLatch,实现原理及案例
闭锁(CountDownLatch)是Java多线程并发中的一种同步器,它是JDK内置的同步器。通过它可以定义一个倒计数器,当倒计数器的值大于0时,所有调用await方法的线程都会等待。而调用countDown方法则可以让倒计数器的值减一,当倒计数器值为0时所有等待的线程都将继续往下执行。闭锁的主要应用场景是让某个或某些线程在某个运行节点上等待N个条件都
Wesley13 Wesley13
3年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
CountDownLatch和CylicBarrier以及Semaphare你使用过吗
CountDownLatch是什么CountDownLatch的字面意思:倒计时门栓它的功能是:让一些线程阻塞直到另一些线程完成一系列操作后才唤醒。它通过调用await方法让线程进入阻塞状态等待倒计时0时唤醒。它通过线程调用countDown方法让倒计时中的计数器减去1,当计数器为0时,会唤醒哪些因为调用了await而阻塞的线程。
Wesley13 Wesley13
3年前
Java CyclicBarrier介绍
CyclicBarrier(周期障碍)类可以帮助同步,它允许一组线程等待整个线程组到达公共屏障点。CyclicBarrier是使用整型变量构造的,其确定组中的线程数。当一个线程到达屏障时(通过调用CyclicBarrier.await()),它会被阻塞,直到所有线程都到达屏障,然后在该点允许所有线程继续执行。与CountDownLatch不同的
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这