Java Stream

Wesley13
• 阅读 787

1 Stream简介

Stream是数据渠道,用于操作数据源(集合,数组等)所生成得元素序列。而集合讲得是数据,流讲得是计算。

注意:

  • Stream 自己不会存储元素。
  • Stream 不会改变源对象。相反,它会返回一个持有结果得新Stream
  • Stream 操作时延迟执行得,这意味着它们会等到需要结果时才执行。(延迟加载)

Stream 操作步骤

  1. Stream 创建: 一个数据源(集合,数组),获取一个流。
  2. Stream 中间操作: 一个中间操作链,对数据源的数据进行处理。
  3. Stream 终止操作: 一个终止操作,执行中间操作链,并产生结果。

2 Stream 用法

2.1 创建Stream

//1. 通过 Collection.stream() / parallelStream() 创建Stream
List<String> list = new ArrayList<String>();
Stream<String> stream11 = list.stream();             // 串行流
Stream<String> stream12 = list.parallelStream();     // 并行流

//2. 通过 Arrays.stream() 获取数组流
IntStream stream2 = Arrays.stream(new int[]{1,2});  // 串行流

//3. 通过 Stream.of() 获取流
Stream<String> stream3 = Stream.of("123", "456");    // 串行流

//4. 创建无限流,需要配合 limit() 截断,不然无限制下去
Stream<Integer> stream41 = Stream.iterate(2, (x) -> x * 2);   // 串行流
Stream<Double> stream42 = Stream.generate(Math::random);      // 串行流

2.2 Stream 中间操作

多个中间操作可以连接起来形成一个流水线,除非流水线上触发终止操作,否则中间操作不会执行任何得处理。而终止操作时一次性全部处理,称为‘延迟加载’

中间操作(例举部分)

说明

limit(long maxSize)

截断,使其元素不超过给定数量

filter(Predicate predicate)

过滤,从流中过滤出想要的元素

skip(long n)

忽略,跳过前n个元素,若流中元素不足n个,则返回空

distinct()

去重,通过元素 hashCode() 和 equals() 去除重复元素

map(Funcation<T,R> mapper)

映射,函数会被应用到每个元素上,并将其映射成一个新的元素

flatMap(Function<T, Stream> mapper)

映射,将流中的每个值都换成一个流,然后把所有流连接成一个流

sorted()

排序,自然排序

sorted(Comparator comparator)

排序,定制排序

// 中间操作:不会执行任何操作
Stream<Double> stream = Stream.generate(Math::random)   // double 无限流
.limit(20)                                      // 截断,取前 20 个
.filter(x -> x > 0.3)                           // 过滤,取大于 0.3 的元素
.skip(1)                                        // 忽略,丢弃第一个元素
.distinct()                                     // 去重
.map(x -> x * 10)                               // 映射,将每个元素扩大 10 倍
.sorted();                                      // 对 double 流进行排序

// 终止操作,只有执行终止操作才会执行全部。即:延迟加载
stream.forEach(System.out::println);


// 中间操作:flatMap 接收一个函数作为参数,将流中的每个值都换成一个流,然后把所有流连接成一个流
List<String> list = Arrays.asList("aaa", "bbb", "ccc", "ddd");
list.stream().flatMap((e) -> filterCharacter(e)).forEach(System.out::println);

//如果使用map则需要这样写
list.stream().map((e) -> filterCharacter(e)).forEach((e) -> {
    e.forEach(System.out::println);
});

public Stream<Character> filterCharacter(String str){
    List<Character> list = new ArrayList<>();
    for (Character ch : str.toCharArray()) {
        list.add(ch);
    }
    return list.stream();
}

2.3 Stream 终止操作

2.3.1 查找与匹配

操作(例举部分)

说明

allMatch(Predicate predicate)

检查是否匹配所有元素

anyMatch(Predicate predicate)

检查是否至少匹配所有元素

noneMatch(Predicate predicate)

检查是否没有匹配所有元素

findFirst()

返回第一个元素

findAny()

返回当前流中任意元素

count()

返回流中元素总个数

max(Comparator comparator)

返回流中最大值

min(Comparator comparator)

返回流中最小值

2.3.2 规约 - 将流中元素结合在一起,返回一个值

操作(例举部分)

说明

reduce(T identitty,BinaryOperator

需要传一个起始值,然后,传入的是一个二元运算

reduce(BinaryOperator

没有起始值,有可能结果为空,所以返回的值会被封装到Optional中

// 求和
List<Integer> list = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Integer sum = list.stream().reduce(0, (x, y) -> x + y);

// 求和,没有起始值,则有可能结果为空,所以返回的值会被封装到Optional中
List<Integer> list = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Optional<Integer> sum = list.stream().reduce(Integer :: sum);

2.3.3 收集

将流转换为其他形式。接收一个Collector接口的实现,用于给Stream中元素做汇总的方法。Collector接口方法的实现决定了如何对流执行收集操作(如收集到List,Set,Map)。但是Collectors实用类提供了很多静态方法,可以方便地创建常见得收集器实例。

操作(例举部分)

说明

Collectors.toList()

将流转换成List

Collectors.toSet()

将流转换为Set

Collectors.toCollection(Supplier supplier)

将流转换为其他类型的集合

Collectors.counting()

元素个数

Collectors.averagingInt/Long/Double(Function<T,R> function)

平均数,不同之处在于传入得参数类型不同,返回值都为Double

Collectors.summingInt/Long/Double(Function<T,R> function)

求和,不同之处在于传入得参数类型不同,返回值为Integer, Double, Long

Collectors.maxBy(Comparator comparator)

最大值

Collectors.minBy(Comparator comparator)

最小值

Collectors.groupingBy(Function<T,R> function)

分组,返回Map

Collectors.partitioningBy(Predicate predicate)

分区,传入函数返回true和false 分成两个区,返回Map

3 并行流

并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。Java8中将并行流进行了优化,我们可以很容易的对数据进行并行操作。Stream API可以声明性地通过parallel()与scqucntial()在并行流与顺序流之间进行切换。

3.1 Fork-Join 框架

Fork—Join框架:是java7提供得一个用于执行任务得框架,就是在必要得情况下,将一个大任务,进行拆分(Fork)成若干个小任务(拆分到不能再拆分),再将一个个的小任务运算得结果进行join汇总。

Fork—Join框架时ExecutorService接口得一种具体实现,目的是为了帮助更好地利用多处理器带来得好处。它是为那些能够被递归地拆分成子任务的工作类型量身设计的。起目的在于能够使用所有有可用的运算能力来提升你的应用的性能。 Java Stream

关于 Fork-Join 实现原理请看这篇:图解Fork/Join https://mp.weixin.qq.com/s/OzZFGW_8GBYHUa0Ef10WVg

/**
 * 要想使用Fark—Join,类必须继承RecursiveAction(无返回值)或者 RecursiveTask(有返回值)
 *
 * 计算从 start 到 end 的数字累加
 */
public class ForkJoin extends RecursiveTask<Long> {

    private long start; // 起始数字
    private long end;   // 结束数字

    public ForkJoin(long start, long end) {
        this.start = start;
        this.end = end;
    }

    // 拆分的最小区间
    private static final long THRESHOLD = 10000L;

    @Override
    protected Long compute() {
        // 当区间小于最小区间时,直接计算累加
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (long i = start; i < end; i++) {
                sum += i;
            }
            return sum;
        } else { // 否则,将区间一分为二,分给两个不同的线程去计算
            // 注意这里,如果有问题,会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常
            long middle = start + (end - start) / 2;

            ForkJoin left = new ForkJoin(start, middle);    // 递归,直到分解到最小区间后,开始计算
            left.fork();    // 拆分子任务,压入线程队列

            ForkJoin right = new ForkJoin(middle, end);     // 递归,直到分解到最小区间后,开始计算
            right.fork();   // 拆分子任务,压入线程队列

            // 合并两部分计算的值
            return left.join() + right.join();
        }
    }

    public static void main(String[] args) {
        // 开始时间
        Instant start = Instant.now();

        // 这里需要一个线程池的支持
        ForkJoinPool pool = new ForkJoinPool();

        // 累加到 1 亿
        ForkJoinTask<Long> task = new ForkJoin(0, 100000000L);

        long sum = pool.invoke(task);

        // 结束时间
        Instant end = Instant.now();

        System.out.println(String.format("累加到1亿的计算时间为:%s 毫秒,值:%s", Duration.between(start, end).toMillis(), sum));
    }
}

3.2 并行流对 Fork-Join 的简化

//开始时间
Instant start = Instant.now();

long sum = LongStream.rangeClosed(0, 1000000000L)  // 创建0-1亿的数字串行流
    .parallel()                                 // 转换为并行流,使用Fort-Join框架,缺省使用ForkJoinPool.commonPool()线程池
    .reduce(0, Long :: sum);        // 规约计算所有元素累加

//结束时间
Instant end = Instant.now();
System.out.println(String.format("累加到1亿的计算时间为:%s 毫秒,值:%s", Duration.between(start, end).toMillis(), sum));

3.3 并行流的性能

性能测试请看:Stream Performance https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/8-Stream%20Performance.md 此处引用结论:

* 对于简单操作,比如最简单的遍历,Stream串行API性能明显差于显示迭代,但并行的Stream API能够发挥多核特性。
* 对于复杂操作,Stream串行API性能可以和手动实现的效果匹敌,在并行执行时Stream API效果远超手动实现。

所以,如果出于性能考虑,
1. 对于简单操作推荐使用外部迭代手动实现,
2. 对于复杂操作,推荐使用Stream API, 
3. 在多核情况下,推荐使用并行Stream API来发挥多核优势,
4. 单核情况下不建议使用并行Stream API。

如果出于代码简洁性考虑,使用Stream API能够写出更短的代码。
即使是从性能方面说,尽可能的使用Stream API也另外一个优势,
那就是只要Java Stream类库做了升级优化,代码不用做任何修改就能享受到升级带来的好处。
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
java8新特性
Stream将List转换为Map,使用Collectors.toMap方法进行转换背景:User类,类中分别有id,name,age三个属性。List集合,userList,存储User对象1、指定keyvalue,value是对象中的某个属性值。 Map<Integer,StringuserMap1userList.str
Wesley13 Wesley13
3年前
JDK1.8 之Stream API总结
Stream是Java8新增加的类,用来补充集合类。Stream代表数据流,流中的数据元素的数量可能是有限的,也可能是无限的。Stream和其它集合类的区别在于:其它集合类主要关注与有限数量的数据的访问和有效管理(增删改),而Stream并没有提供访问和管理元素的方式,而是通过声明数据源的方式,利用可计算的操作在数据源上执行,当然
Wesley13 Wesley13
3年前
Java8系列之Stream总结
流的简介  官方解释,Stream是Java8的一大亮点,它与java.io包里的InputStream和OutputStream是完全不同的概念。它也不同于StAX对XML的解析的Stream,也不是AmazonKinesis对大数据实时处理的Stream。它是对集合对象功能的增强,她专注于对集合对象进行各种非常便利、高效的聚合操作(ag
Wesley13 Wesley13
3年前
Java学习:Stream流式思想
Stream流Java8API添加了一种新的机制——Stream(流)。Stream和IO流不是一回事。流式思想:像生产流水线一样,一个操作接一个操作。使用Stream流的步骤:数据源→转换成流→操作1→操作2→……数据源(source):可以是集合、数组等。St
Wesley13 Wesley13
3年前
Java 8新特性之Stream 概念
Java8中有两大最为重要的改变。第一个是Lambda表达式;另外一个则是StreamAPI(java.util.stream.\)。Stream是Java8中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操作。使用StreamAPI对集合数据进行操作,就类似于使用SQL执行
Wesley13 Wesley13
3年前
Java8 新特性之集合操作Stream
Java8新特性之集合操作StreamStream简介Java8引入了全新的StreamAPI。这里的Stream和I/O流不同,它更像具有Iterable的集合类,但行为和集合类又有所不同。stream是对集合对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作,或者大批量数据操作。
Wesley13 Wesley13
3年前
Java8 数据流
一、基本知识\\数据流(stream)\\是对集合(collection)功能的增强,更专注于对集合对象的各种便利、高效的聚合,大批量数据操作。数据流的特点:元素序列流提供了一组特定类型的以顺序方式元素。源流使用集合,数组或I/O资源为输入源。聚合操作数据流支持如filter
京东云开发者 京东云开发者
11个月前
使用Flink完成流数据统计 | 京东云技术团队
Flink程序构建的基本单元是stream和transformation(DataSet实质上也是stream)。stream是一个中间结果数据,transformation对数据的加工和操作,该操作以一个或多个stream为输入,计算输出一个或多个stream为结果,最后可以sink来存储数据。