java8 Collector 接口

Wesley13
• 阅读 896

java8的Stream中的collect方法,用于对流中的数据进行归集操作,collect方法接受的参数是一个Collector,忽略掉静态方法后,Collector接口内容如下:

public interface Collector<T, A, R>
    // 用于生成空的累加器实例,这个累加器的类型是A
    Supplier<A> supplier();
    // 生成一个用于执行归集操作的 BiConsumer<A,T>,A是supplier生成的累加器,T是数据流中的每个元素的数据类型,可以看作,把T累加到A
    BiConsumer<A, T> accumulator();
    // 并行归集时,需要对多个累加器进行合并操作
    BinaryOperator<A> combiner();
    // 作用是把A转换为R做为最终的返回值。即累加器类型转换函数
    Function<A, R> finisher();
    // 特性列表
    Set<Characteristics> characteristics();

    /** 特性:以下注释基本有道翻译自原始文档注释[手动笑哭脸] */
    enum Characteristics {
        /** 支持并行归集*/
    /** 如果收集器不同时是无序的(UNORDERED),那么只在应用于无序数据源时,才应该并行归集 */
        CONCURRENT,
        /** 无序的,收集器并不按照Stream中的元素输入顺序执行 */
        UNORDERED,
    /** 表示完成器finisher方法返回的函数是一个恒等函数,这时,累加器对象就是归约操作的最终结果*/
    /** 如果设置,则必须是这样一种情况:从A到R的未检查强制转换将会成功。 */
        IDENTITY_FINISH
    }
}

这个接口还是有一点点复杂的,3个泛型,5个方法,其中一个characteristics方法用于提供特性列表,其中最重要的就是CONCURRENT,是否允许并行归集; 而另外4个方法,可以返回4个函数,这4个函数各有用途;

我们梳理一下归集操作的主要流程,看看在这个过程中如何使用Collector

  1. 对一个数据集进行归集,首先要进行遍历,遍历的过程中,对数据集的每一个元素,进行某种操作,这个操作动作,由accumulator方法提供,该方法返回一个BiConsumer<A, T>函数,此函数消费两个参数,不返回值;该函数中<A, T>两个泛型,其中 T 表示数据集中的元素,而 A 表示对元素操作过程中产生的中间值进行存储的临时变量,即上面说的累加器;
  2. 上一步中提到了对数据操作的过程中需要对计算结果进行临时存储,那就需要一个存储的容器,该容器由supplier方法提供,该方法返回一个Supplier函数,可产生一个类型为A的对象;
  3. 如果是并行归集的话,需要一个方法把各个子任务的归集结果进行合并,combiner方法派上用场;
  4. 最后,计算完成后,还要返回一个归集结果,finisher方法可得到一个Function函数,把累加器A转换成最终的响应结果R并返回;

看一下Stream.collect的源码:


    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        // 声明一个存储中间值的容器变量(累加器)
    A container;
    // 如果是并行流,并且满足:
    // [1. collector特性中包含CONCURRENT(并行操作);2. 流是无序的,或者collector特性中包含了UNORDERED(无需按顺序进行归集);],
    //执行下面的代码进行并行归集,因为这种情况下,不需要专门提供一个将各个子任务进行合并的方法
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        // 首先从supplier方法中获得一个容器
            container = collector.supplier().get();
        // 拿到执行归集操作的BiConsumer函数
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        // 遍历集合,调用BiConsumer函数的accept方法,并且传入存储临时值的累加器和当前遍历到的元素,并行流中,forEach遍历是并行的
            forEach(u -> accumulator.accept(container, u));
        }
        else {
        // 不满足上述条件,就调用evaluate方法进行归集
        // evaluate的细节先不说了,因为说来话长
            container = evaluate(ReduceOps.makeRef(collector));
        }
    // 返回值,如果collector特性列表中包含了IDENTITY_FINISH,就返回容器自身,否则, collector.finisher()获得完成器,并传入累加器进行完成操作,最终的结果作为返回值
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

另外,Stream中的collect方法还有另一个重载方法:

    public final <R> R collect(Supplier<R> supplier,
                                   BiConsumer<R, ? super P_OUT> accumulator,
                                   BiConsumer<R, R> combiner) {
            return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
    }

省去了Collector,入参直接接受三个函数,正是上面讲到的三个函数,省去了对结果进行转换的finisher,而实现方法更是直接调用了evaluate,省去了是否并行流和Collector的特征判断;


下面看一下Collector的工具类Collectors中提供的几个常用实现;

其实实现只有一个:CollectorImpl,结构也简单,构造器传入几个属性完事儿,方法实现中返回对应属性;

主要是看几个常用方法:


Collectors.toList()

    public static <T> Collector<T, ?, List<T>> toList() {
            return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new,
                    List::add,
                                (left, right) -> {left.addAll(right); return left;},
                    CH_ID);
        }
        
    static final Set<Collector.Characteristics> CH_ID
            = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
  1. ArrayList::new,创建一个ArrayList作为累加器;
  2. List::add,对流中元素的操作就是直接添加到累加器中;
  3. 如果是并行归集,对子任务归集结果进行全并的方法是 addAll,后一个子任务的结果直接全部添加到前一个子任务结果中;
  4. CH_ID是一个预定义好的特性列表,只有一个,IDENTITY_FINISH,表示累加器就是最终要返回的结果,不需要转换;

也可以看出Collectors.toList()归集器的归集结果是一个ArrayList,如果想转换为其它List的实现,还需要自己操作,或者定制一个归集器;


Collectors.toMap

    public static <T, K, U>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
                                    Function<? super T, ? extends U> valueMapper) {
    // 调用另一个toMap方法,最后一个参数表示supplier,生成累加器,一个HashMap
        return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
    }
    
    public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
                                Function<? super T, ? extends U> valueMapper,
                                BinaryOperator<U> mergeFunction,
                                Supplier<M> mapSupplier) {
    // 这个是要对元素进行的操作,调用俩个映射函数分别生成k和v,然后加入到累加器中
    // map.merge:如果给定key没绑定值或值为null,则绑定给定值,否则,执行重映射方法替换原来值或者删除原来的值。
        BiConsumer<M, T> accumulator
                = (map, element) -> map.merge(keyMapper.apply(element),
                                              valueMapper.apply(element), mergeFunction);
    // 最后特性表仍然是CH_ID,直接返回累加器
        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
    }

不多说,看注释就好


分组,这个比较复杂,是组合归集器:Collectors.groupingBy

    // 传入一个映射函数,用于分成key,也就是我们的分组依据
    public static <T, K> Collector<T, ?, Map<K, List<T>>>
    groupingBy(Function<? super T, ? extends K> classifier) {
    // 这里除了分组依据,还传入一个Collector,我们称之为子归集器,用于对分组后的数据进行归集
        return groupingBy(classifier, toList());
    }
    public static <T, K, A, D>
    Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                          Collector<? super T, A, D> downstream) {
    // 这里HashMap::new就是supplier,用于生成累加器
        return groupingBy(classifier, HashMap::new, downstream);
    }

    public static <T, K, D, A, M extends Map<K, D>>
    Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                  Supplier<M> mapFactory,
                                  Collector<? super T, A, D> downstream) {
        // 子归集器的累加器
        Supplier<A> downstreamSupplier = downstream.supplier();
        // 子归集器要进行的操作
        BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
        // 父归集器的操作m表示累加器,t表示要操作的数据
        BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
            // 把数据映射成Key
            K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
            // 从累加器中取出子累加器,如果没有,用后面的函数生成一个并绑定
            A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
            // 子归集器对子累加器和数据进行操作
            downstreamAccumulator.accept(container, t);
        };
    // 并行归集的合并函数
        BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
        @SuppressWarnings("unchecked")
    // 累加器
        Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;

    // 如果子归集器特性中包含IDENTITY_FINISH,默认情况下子归集器是一个toList()的结果,本身是IDENTITY_FINISH的
        if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
        // 最终还是创建一个CollectorImpl
            return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
        }
        else {
        // 子归集器的完成函数
            @SuppressWarnings("unchecked")
            Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
        // 父归集器的完成函数
            Function<Map<K, A>, M> finisher = intermediate -> {
                intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                @SuppressWarnings("unchecked")
                M castResult = (M) intermediate;
                return castResult;
            };
            return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
        }
    }

比较复杂,大概的过程,不过结合注释应该还是能看的懂;

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
Java日期时间API系列30
  实际使用中,经常需要使用不同精确度的Date,比如保留到天2020042300:00:00,保留到小时,保留到分钟,保留到秒等,常见的方法是通过格式化到指定精确度(比如:yyyyMMdd),然后再解析为Date。Java8中可以用更多的方法来实现这个需求,下面使用三种方法:使用Format方法、 使用Of方法和使用With方法,性能对比,使用
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(