Java并行流指北

javalover123
• 阅读 360

一、前言

  • Java并行流,方便了 并发操作,但是不注意可能会导致问题。
  • 如 最大线程数,怎么控制并发数,类加载器,线程上下文变化,ForkJoinPool 的 execute、submit、invoke 方法的区别 等。
  • 注意:本文以 openjdk 11.0.10 为例,没有特殊说明时,都是指 ForkJoinPool.commonPool()

二、注意点

1. 并行度

  • 并行度 不等于 最大线程数(maximumPoolSize),下图 commonPool 有49个线程,但是 并行度为1
  • 默认的 并行度为 CPU核数 - 1,最小为 1
  • 可通过 -Djava.util.concurrent.ForkJoinPool.common.parallelism=数量 设置 Java并行流指北

2. 容器里面的并行度

  • 下图中,/sys/fs/cgroup/cpu/cpu.cfs_quota_us 除以 /sys/fs/cgroup/cpu/cpu.cfs_period_us = cpu核数
  • 不等于 nproc,更不等于 获得宿主机的 lscpu | grep 'CPU(s):' Java并行流指北

3. 最大线程数

  • 并行度 不等于 最大线程数(maximumPoolSize)
  • 即使 并行度 parallelism 为1,还有 备用线程(maximumPoolSize、COMMON_MAX_SPARES)
  • commonPool 默认 256,自定义 ForkJoinPool() 默认 32767。这样看,比较少会出现 线程数不够的情况。 Java并行流指北

4. 并发太大,压垮后端

  • 假如 ForkJoinPool.commonPool() 线程比较多,并行流集合的元素也比较多时,给下游较大压力
  • jstack pid | grep -c commonPool

5. 线程上下文变化

如:获取不到用户信息了,可以获取到用户信息以后,传到并行流使用

final String deviceUdid = RequestUtils.getDeviceUdid();
data.parallelStream().forEach(d -> {
    // use deviceUdid instead of RequestUtils.getDeviceUdid() do something
});

6. ForkJoinPool 的 execute、submit、invoke 方法的区别

  • 有些简单的任务,不想单独创建线程池,可以用 ForkJoinPool.commonPool()
  • execute():异步执行,没有返回值,不能等待执行完成
  • submit():异步执行,返回 ForkJoinTask,需增加 .join() 等待完成
  • invoke():等于 submit() + join()

7. spring boot使用Java并行流发送kafka消息报错

  • 类加载器不一样,详见 spring boot 使用 Java 并行流发送 kafka 消息报错
  • 使用 spring-boot-maven-plugin 打包以后,依赖在 jar里面自定义位置(BOOT-INF/lib/),使用 org.springframework.boot.loader.LaunchedURLClassLoader 加载
  • ForkJoinPool.commonPool 默认使用 DefaultForkJoinWorkerThreadFactory,用的 系统ClassLoader,所以 并行流加载不到依赖的 class
  • 可通过 -Djava.util.concurrent.ForkJoinPool.common.threadFactory 设置 自定义线程工厂,使用当前 ClassLoader 解决 Java并行流指北

8. 自定义并行流线程池

参考 concurrency - Custom thread pool in Java 8 parallel stream - Stack Overflow

  • 方案一(各种情况都有效)

    CompletableFuture.runAsync(runnable, new ForkJoinPool(2)).join()
  • 方案二(部分场景似乎没有效果)

    // 第4个参数 asyncMode,默认 false,设置为 true 适用于 FIFO
    ForkJoinPool forkJoinPool = new ForkJoinPool(2, pool -> new ForkJoinWorkerThread(pool) {
    }, null, false);
    forkJoinPool.invoke(() -> list.parallelStream().forEach());

9. 控制并发数

  • 可考虑把 集合切分成需要的份数,然后 parallelStream()
    List<String> list = List.of("a", "b", "c");
    CollUtil.split(list, list.size() / 2 + 1).parallelStream().forEach(b -> {
      b.stream().forEach(System.out::println);
    });

10. 顺序消费

  • 如 forEachOrdered 会导致没有并发效果
  • 需要并行,还要使用输入顺序的,可考虑把 集合切分成需要的份数,然后 parallelStream()

三、总结

本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明,否则将追究法律责任。
本文首先发布于 https://www.890808.xyz/ ,其他平台需要审核更新慢一些。

Java并行流指北

点赞
收藏
评论区
推荐文章
九鹤 九鹤
3年前
并发编程的基础概念
什么是线程?什么是进程?java可以开启线程吗?不能因为Java无法直接操硬件,他是运行在虚拟机上面的,什么是并发?什么是并行?并发就是多个线程去操作一个资源。并行是多个线程同时行,但是操作的资源不是同一个。线程的六个状态new(诞生)runnable(运行)Blocked(阻塞)waiiiing(等待)Tiemwaiing(超时等待)
Wesley13 Wesley13
3年前
JAVA多线程学习
Java通过Excutors提供四种线程池:newCachedThreadPool        创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool        创建一个定长线程,可控制线程最大并发数
Wesley13 Wesley13
3年前
JUC并发编程之:简单概述(一)
JUC并发编程之:简单概述(一)内容概述:·进程和线程、并发和并行、同步和异步概念·如何查看和关闭进程·Java线程常用的类和方法一、概念:一、进程与线程1·进程·程序由指令和数据组成,但这些指令要运行,数据要读写,就
Wesley13 Wesley13
3年前
Java通过Executors提供四种线程池
Java通过Executors提供四种线程池,分别为:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newScheduledThreadPool创建
Wesley13 Wesley13
3年前
Go 并发
Go并发并发指的是同时处理多个任务的能力。并行指的是并行处理多个任务的能力。并行不一定加快运行速度,因为并行组件之间可能需要互相通信。Go中使用协程,信道来处理并发。协程Go中主要通过协程实现并发。协程是与其他函数或方法一起并发运行的函数或方法,协程可以看作是轻量级线程,但是创建成本更小,我们经常
Wesley13 Wesley13
3年前
Java总结:Java多线程
多线程作为Java中很重要的一个知识点,在此还是有必要总结一下的。Java给多线程编程提供了内置的支持。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。多线程是多任务的一种特别的形式,但多线程使用了更小的资源开销。这里定义和线程相关的另一个术语进程:一个进程包括由操作系统分配的内存空间,
Wesley13 Wesley13
3年前
Java 并发编程:进程、线程、并行与并发
一谈到Java并发编程,我们一般就会联想起进程、线程、并行、并发等等概念。那么这些概念都代表什么呢?进程与线程有什么关系?并发与并行又是什么关系呢?进程与线程进程是指程序的一次动态执行过程,通常我们说计算机中正在执行的程序就是进程,每个程序都会对应着一个进程。一个进程包含了从代码加载到执行完成的一个完整过程,它是操作系统资源分配最小单
Wesley13 Wesley13
3年前
Java并发编程:进程、线程、并行与并发
一谈到Java并发编程,我们一般就会联想起进程、线程、并行、并发等等概念。那么这些概念都代表什么呢?进程与线程有什么关系?并发与并行又是什么关系呢?01 进程与线程进程是指程序的一次动态执行过程,通常我们说计算机中正在执行的程序就是进程,每个程序都会对应着一个进程。一个进程包含了从代码加载到执行完成的一个完整过程,它是操作系
Stella981 Stella981
3年前
Python 浅析线程(threading模块)和进程(process)
    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务进程与线程什么是线程(threading)?Athreadisanexecutioncontext,whichisall
深入浅出线程池 | 京东云技术团队
一、线程1、什么是线程线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。2、如何创建线程2.1、JAVA中
javalover123
javalover123
Lv1
10年Java经验,多个开源项目贡献者。https://github.com/javalover123
文章
16
粉丝
2
获赞
5