Java 多线程中使用 JDK 自带工具类实现计数器

Wesley13
• 阅读 530

Java 多线程中使用 JDK 自带工具类实现计数器

前言

在实际开发过程中,经常遇到需要多线程并行的业务,最后需要进行将各个线程完成的任务进行汇总,但主线程一般会早于子线程结束,如果要想等各个子线程完成后再继续运行主线程,这时就需要对各个线程是否执行完成进行标识,JDK 并发包中就给开发者提供了几个不错的使用工具类。

接下来将通过 Thread#join 方法以及 CountDownLatch、CyclicBarrier 类进行上面案例方案的分析。

Thread#join 方法

使用 join() 方法的子线程对象正常执行 run() 中代码,但当前线程会被无超时阻塞,等待执行 join() 方法的线程销毁后,继续执行被阻塞的当前线程。join() 方法阻塞原理是该方法内使用 wait() 方法阻塞,源码如下所示:

Java 多线程中使用 JDK 自带工具类实现计数器

子线程 join() 完成时会调用 notifyAll() 来通知当前线程继续执行接下来的代码。

假如现在有两个线程产生数据结果,最后将两个线程结果进行相加,如果直接将两个线程执行并进行汇总,如下实现代码:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * htpps://ytao.top
 * 
 * Created by YangTao on 2020/5/17 0017.
 */
public class JoinTest {


    public static void main(String[] args) throws InterruptedException {

        Map<String, Integer> map = new ConcurrentHashMap<>();

        Thread thread1 = new Thread(() -> {
            map.put("thread1", 1);
            System.out.println("run thread1");
        });

        Thread thread2 = new Thread(() -> {
            map.put("thread2", 2);
            System.out.println("run thread2");
        });


        thread1.start();
        thread2.start();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end....");

    }
}

执行结果:

Java 多线程中使用 JDK 自带工具类实现计数器

由于主线程的汇总计算可能早于子线程完成,所以这时获取子线程结果为空指针异常。

通过增加 join() 方法实现阻塞主线程,等待子线程完成后再进行汇总:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * htpps://ytao.top
 * 
 * Created by YangTao on 2020/5/17 0017.
 */
public class JoinTest {


    public static void main(String[] args) throws InterruptedException {

        Map<String, Integer> map = new ConcurrentHashMap<>();

        Thread thread1 = new Thread(() -> {
            map.put("thread1", 1);
            System.out.println("run thread1");
        });

        Thread thread2 = new Thread(() -> {
            map.put("thread2", 2);
            System.out.println("run thread2");
        });


        thread1.start();
        thread2.start();
        
        // 两个线程分别调用 join() 方法,使主线程被阻塞
        thread1.join();
        thread2.join();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end....");

    }
}

执行结果为:

Java 多线程中使用 JDK 自带工具类实现计数器

通过结果可以看到子线程汇总求和为 3。此时主线程在两个子线程销毁前都处于等待状态,直至两个销毁后主线程再执行汇总求和,所以两个线程产生的值都已存在。

同时,子线程 join() 方法可以使当前线程无期限等待,也可以设置最长等待时长 join(long) 方法,无论子线程是否执行完成,当前线程会继续执行后面代码。使用方法加入超时参数即可,其它与 join() 方法使用相同。

CountDownLatch

CountDownLatch 可以使一个或多个线程等待其他线程完成操作后再继续执行当前线程后面代码。

CountDownLatch 的使用:首先创建 CountDownLatch 对象,通过传入参数 int 构造 CountDownLatch 对象。该参数是值将要等待的执行点的数量。

CountDownLatch 中有几个方法:

  • getCount() 返回当前计数器数,即当前剩余的等待数量。官方解释说该方法通常用于调试和测试目的。
  • countDown 每调用一次,计数器便会进行减 1 操作,但计数器必须大于 0。
  • await 该方法会阻塞当前线程,直至计数器为 0 时,就会不再阻塞当前线程。同时也提供 await(long timeout, TimeUnit unit) 方法,可设置超时时间。

利用 CountDownLatch 实现汇总求和案例,实现代码如下:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * https://ytao.top
 *
 * Created by YangTao on 2020/5/17 0017.
 */
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {

        Map<String, Integer> map = new ConcurrentHashMap<>();

        CountDownLatch count = new CountDownLatch(2);

        Thread thread1 = new Thread(() -> {
            map.put("thread1", 1);
            System.out.println("run thread1");
            count.countDown();
        });

        Thread thread2 = new Thread(() -> {
            map.put("thread2", 2);
            System.out.println("run thread2");
            count.countDown();
        });


        thread1.start();
        thread2.start();

        // 一直阻塞当前线程,直至计数器为 0
        count.await();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end.... getCount: " + count.getCount());
    }

}

执行结果如下:

Java 多线程中使用 JDK 自带工具类实现计数器

上图中求和结果为 3,同时计数器为 0。

通过查看 CountDownLatch 源码,主要是通过一个继承 AbstractQueuedSynchronizer 类的内部类 Sync 来实现的,可知其实现原理为 AQS,这里不进行展开讲述。

CyclicBarrier

CyclicBarrier 是一个可循环使用的屏障。实现原理解释,就是在一个或多个线程运行中设置一个屏障,线程到达这个屏障时会被阻塞,直到最后一个线程到达时,被屏障阻塞的线程继续执行。

CyclicBarrier 构造方法有两个,CyclicBarrier(int count)CyclicBarrier(int count, Runnable barrierAction):

  • 单个int参数构造方法,表示构造到达屏障线程的数量。
  • 一个int和一个Runnable参数构造方法,前者参数表示到达屏障线程的数量,后者参数表示所有线程到达屏障后接下来要执行的代码;

CyclicBarrier 中方法:

方法

说明

await()

阻塞前线程,等待 trip.signal() 或 trip.signalAll() 方法唤醒

await(long, TimeUnit)

在 await() 上增加两个参数,等待超时时间 timeout,单位为 unit

breakBarrier()

放开屏障,设置标志,唤醒被屏障阻塞的线程

isBroken()

阻塞的线程是否被中断

reset()

重置 CyclicBarrier 对象

getNumberWaiting()

当前被阻塞线程的数量

getParties()

到达屏障的线程总数量,即创建时指定的数量

使用 CyclicBarrier 实现上面汇总:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * https://ytao.top
 *
 * Created by YangTao on 2020/5/17 0017.
 */
public class CyclicBarrierTest {

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {

        Map<String, Integer> map = new ConcurrentHashMap<>();

        CyclicBarrier barrier = new CyclicBarrier(2, new Thread(()->{
            // 所有线程到达屏障后,需要执行的代码
            System.out.println(map.get("thread1") + map.get("thread2"));
            System.out.println("CyclicBarrier end.... ");
        }));

        Thread thread1 = new Thread(() -> {
            map.put("thread1", 1);
            System.out.println("run thread1");
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        });

        Thread thread2 = new Thread(() -> {
            map.put("thread2", 2);
            System.out.println("run thread2");
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });

        thread1.start();
        thread2.start();

    }
}

执行结果:

Java 多线程中使用 JDK 自带工具类实现计数器

执行完两条子线程,并且在子线程里调用barrier.await()后,屏障被打开,最后执行 CyclicBarrier 的最后的代码逻辑。

通过上面 CyclicBarrier 的方法可知,CyclicBarrier 比 CountDownLatch 使用更加灵活,CyclicBarrier 的 reset() 方法可以重置计数器,而 CountDownLatch 则只能使用一次。同时,CyclicBarrier 拥有更多线程阻塞信息的方法提供使用,在使用过程中,提供更加灵活的使用方式。

总结

上面三种方式,均由 JDK 的并发包中提供的工具。在多线程协作任务中,对计数器场景问题的解决方案,实现 main 线程对 worker 线程的等待完成。在实际开发应用中,使用频率也是非常之高。

推荐阅读

《Java 线程基础,从这篇开始》

《Java 线程通信之 wait/notify 机制》

《你必须会的 JDK 动态代理和 CGLIB 动态代理》

《Dubbo 扩展点加载机制:从 Java SPI 到 Dubbo SPI》

《Netty中粘包/拆包处理》

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
4、jstack查看线程栈信息
1、介绍利用jps、top、jstack命令找到进程中耗时最大的线程,以及线程状态等等,同时最后还可以显示出死锁的线程查找:FoundoneJavaleveldeadlock即可1、jps获得进程号!(https://oscimg.oschina.net/oscnet/da00a309fa6
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这