CycleBarrier与CountDownLatch原理

Stella981
• 阅读 660

CountDownLatch

众所周知,它能解决一个任务必须在其他任务完成的情况下才能执行的问题,代码层面来说就是只有计数countDown到0的时候,await处的代码才能继续向下运行,例如:

import java.util.*;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws Exception {

        CountDownLatch latch = new CountDownLatch(3);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
        Future<Integer>[] futures = new Future[3];
        for (int i = 0; i < 3; i++){
            futures[i] = executor.submit(() -> {
                Random rand = new Random();
                int n = rand.nextInt(100);
                int result = 0;
                for (int j = 0; j < n; j++){
                    result += j;
                }
                System.out.println(result + "|" + Thread.currentThread().getName());
                latch.countDown();
                return result;
            });
        }
        latch.await();
        System.out.println("合计每个任务的结果:" + (futures[0].get()+futures[1].get()+futures[2].get()));
    }

}

运行结果:

CycleBarrier与CountDownLatch原理

源码

实际上内部十分简单,里面只有一个AQS的子类

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 它把AQS的state(同步状态)作为计数器,在AQS里,state是个volatile标记的int变量
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        // 同步状态为0,则返回1,否则返回-1
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            // 如果状态为0则返回false
            if (c == 0)
                return false;
            // 计数器减1
            int nextc = c-1;
            // CAS操作,如果内存中的同步状态值等于期望值c,那么将同步状态设置为给定的更新值nextc
            if (compareAndSetState(c, nextc))
                return nextc == 0;  // 当计数器减到0,返回true
        }
    }
}

public void countDown() {
    sync.releaseShared(1);
}

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

下面看具体做了什么事情

先来看await

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 当计数器不等于0,返回-1,证明还有任务未执行完,进入下面方法等待
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 把当前线程包装成Node放入等待队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取当前线程的前驱节点,以检查等待状态
            final Node p = node.predecessor();
            if (p == head) {
                // 如果计数器等于0,返回1,证明此时阻塞可以解除了
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上面的过程可以总结为:当进入await方法后,如果此时计数器不为0,则进入死循环一直检查计数器的值,直到为0退出,此时停止等待。

再来看countDown

public final boolean releaseShared(int arg) {
    // 尝试计数器减1,只有减到0才会返回true
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 等待状态为SIGNAL
            if (ws == Node.SIGNAL) {
                // 把当前节点的等待状态从SIGNAL设置成0,如果设置失败则继续循环。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 成功的话则卸载当前节点的所有后继
                unparkSuccessor(h);
            }
            // 如果等待状态为0,则尝试将状态设置为PROPAGATE,如果设置失败则继续循环。
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

countDown的过程可以总结为:尝试将计数器-1,直到为0,为0的时候通知等待线程。

CycleBarrier

栏栅的作用就是让指定的一批任务能够同时开始执行,比如

import java.util.*;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);


        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
        Future<Integer>[] futures = new Future[3];
        for (int i = 0; i < 3; i++){
            futures[i] = executor.submit(() -> {
                System.out.println("await|" + Thread.currentThread().getName());
                cyclicBarrier.await();
                Random rand = new Random();
                int n = rand.nextInt(100);
                int result = 0;
                for (int j = 0; j < n; j++){
                    result += j;
                }
                System.out.println(result + "|" + Thread.currentThread().getName());
                return result;
            });
        }
    }

}

运行结果

CycleBarrier与CountDownLatch原理

源码

进来之后首先发现的是成员变量

/** 用来保护栅栏入口的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待条件,直到计数器为0 */
private final Condition trip = lock.newCondition();
/** 参与线程的个数 */
private final int parties;
/* 计数器为0时要运行的命令,由用户定义 */
private final Runnable barrierCommand;
/** 当前等待的一代 */
private Generation generation = new Generation();
/**
 * parties数量的等待线程。每一代等待的数量从parties到0。当调用nextGeneration或者breakBarrier方法时重置。
 */
private int count;

从这里可以看出,除了内部实现用的ReentrantLock,其工作过程无非:计数器不为0的时候线程等待;当等待线程全部就绪,也就是计数器减为0的时候重置计数器并通知所有线程继续运行。

导致计数器重置原因有两个:一个就是发生异常,将当前这一代标记为无效(broken=true);另一个就是正常就绪,开启下一代(new Generation)

核心方法dowait

// 情况一:timed=false,nanos=0L,代表一直阻塞
// 情况二:timed=true,nanos!=0L,代表在超时时间内阻塞
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取当前这一代
        final Generation g = generation;

        // 如果当前这一代已经销毁,抛异常
        if (g.broken)
            throw new BrokenBarrierException();
        // 测试当前线程是否被中断
        if (Thread.interrupted()) {
            // 将broken设置为true,代表这一代已经销毁,重置count;然后通知所有等待线程
            breakBarrier();
            throw new InterruptedException();
        }
        // count 减1
        int index = --count;
        // 如果减1之后变成0,证明等待线程全部就绪。
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 如果用户定义了额外的命令,则执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 开启下一代(通知所有等待线程,重置count,new一个新的Generation)
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 如果减1之后不等于0,也就是还有其它线程没有就绪,那么进入此循环,直到就绪或者被销毁,或者被中断和超时
        for (;;) {
            try {
                if (!timed)
                    // 未定义超时,则一直阻塞
                    trip.await();
                else if (nanos > 0L)
                    // 等待指定的超时时间
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            // 超时,则销毁这一代,通知所有等待线程并重置count
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

总结

两个工具实现思路都很简单,唯一我思考的是,为什么CountDownLatch只能用一次?

CycleBarrier很明显,它无论正常执行或者发生异常中断都有重置count的逻辑。

而CountDownLatch则没有重置的逻辑,那么,到底是CountDownLatch不能重置还是仅仅因为没有重置的逻辑。为此我把CountDownLatch的代码照搬,然后加上了简单的重置方法,如下:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;


public class MyCountDown {

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        /**
         * 新加
         * @param count
         */
        void reset(int count){
            // 重新设置状态
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    private final int count;

    public MyCountDown(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
        this.count = count;
    }


    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }


    public void countDown() {
        sync.releaseShared(1);
    }


    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

    /**
     * 新加
     */
    public void reset(){
        // 调用重置的方法
        this.sync.reset(count);
    }
}

测试:

import java.util.*;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws Exception {

        MyCountDown myCountDown = new MyCountDown(3);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
        Future<Integer>[] futures = new Future[3];
        for (int i = 0; i < 3; i++){
            futures[i] = executor.submit(() -> {
                Random rand = new Random();
                int n = rand.nextInt(100);
                int result = 0;
                for (int j = 0; j < n; j++){
                    result += j;
                }
                System.out.println(result + "|" + Thread.currentThread().getName());
                Thread.sleep(new Random().nextInt(2000));   // 模拟耗时
                myCountDown.countDown();
                return result;
            });
        }
        myCountDown.await();
        System.out.println("第一次:" + (futures[0].get() + futures[1].get() + futures[2].get()));
        myCountDown.reset();    // 重置

        for (int i = 0; i < 3; i++){
            futures[i] = executor.submit(() -> {
                Random rand = new Random();
                int n = rand.nextInt(100);
                int result = 0;
                for (int j = 0; j < n; j++){
                    result += j;
                }
                System.out.println(result + "|" + Thread.currentThread().getName());
                Thread.sleep(new Random().nextInt(2000));   // 模拟耗时
                myCountDown.countDown();
                return result;
            });
        }
        myCountDown.await();
        System.out.println("如果重置无效,则这个信息会先于任务信息输出");
        System.out.println("第二次:" + (futures[0].get() + futures[1].get() + futures[2].get()));
    }

}

输出

CycleBarrier与CountDownLatch原理

如果换成CountDownLatch

import java.util.*;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws Exception {

        CountDownLatch latch = new CountDownLatch(3);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
        Future<Integer>[] futures = new Future[3];
        for (int i = 0; i < 3; i++){
            futures[i] = executor.submit(() -> {
                Random rand = new Random();
                int n = rand.nextInt(100);
                int result = 0;
                for (int j = 0; j < n; j++){
                    result += j;
                }
                System.out.println(result + "|" + Thread.currentThread().getName());
                Thread.sleep(new Random().nextInt(2000));   // 模拟耗时
                latch.countDown();
                return result;
            });
        }
        latch.await();
        System.out.println("第一次:" + (futures[0].get() + futures[1].get() + futures[2].get()));

        for (int i = 0; i < 3; i++){
            futures[i] = executor.submit(() -> {
                Random rand = new Random();
                int n = rand.nextInt(100);
                int result = 0;
                for (int j = 0; j < n; j++){
                    result += j;
                }
                System.out.println(result + "|" + Thread.currentThread().getName());
                Thread.sleep(new Random().nextInt(2000));   // 模拟耗时
                latch.countDown();
                return result;
            });
        }
        latch.await();
        System.out.println("如果重置无效,则这个信息会先于任务信息输出");
        System.out.println("第二次:" + (futures[0].get() + futures[1].get() + futures[2].get()));
    }

}

输出

CycleBarrier与CountDownLatch原理

 所以可以得出结论,CountDownLatch不是没有办法重置,只不过没有写相关逻辑。当然这个问题如果我说错了,望指正。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Java修道之路,问鼎巅峰,我辈代码修仙法力齐天
<center<fontcolor00FF7Fsize5face"黑体"代码尽头谁为峰,一见秃头道成空。</font<center<fontcolor00FF00size5face"黑体"编程修真路破折,一步一劫渡飞升。</font众所周知,编程修真有八大境界:1.Javase练气筑基2.数据库结丹3.web前端元婴4.Jav
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这