java8的Stream中的collect方法,用于对流中的数据进行归集操作,collect方法接受的参数是一个Collector,忽略掉静态方法后,Collector接口内容如下:
public interface Collector<T, A, R>
// 用于生成空的累加器实例,这个累加器的类型是A
Supplier<A> supplier();
// 生成一个用于执行归集操作的 BiConsumer<A,T>,A是supplier生成的累加器,T是数据流中的每个元素的数据类型,可以看作,把T累加到A
BiConsumer<A, T> accumulator();
// 并行归集时,需要对多个累加器进行合并操作
BinaryOperator<A> combiner();
// 作用是把A转换为R做为最终的返回值。即累加器类型转换函数
Function<A, R> finisher();
// 特性列表
Set<Characteristics> characteristics();
/** 特性:以下注释基本有道翻译自原始文档注释[手动笑哭脸] */
enum Characteristics {
/** 支持并行归集*/
/** 如果收集器不同时是无序的(UNORDERED),那么只在应用于无序数据源时,才应该并行归集 */
CONCURRENT,
/** 无序的,收集器并不按照Stream中的元素输入顺序执行 */
UNORDERED,
/** 表示完成器finisher方法返回的函数是一个恒等函数,这时,累加器对象就是归约操作的最终结果*/
/** 如果设置,则必须是这样一种情况:从A到R的未检查强制转换将会成功。 */
IDENTITY_FINISH
}
}
这个接口还是有一点点复杂的,3个泛型,5个方法,其中一个characteristics方法用于提供特性列表,其中最重要的就是CONCURRENT,是否允许并行归集; 而另外4个方法,可以返回4个函数,这4个函数各有用途;
我们梳理一下归集操作的主要流程,看看在这个过程中如何使用Collector
- 对一个数据集进行归集,首先要进行遍历,遍历的过程中,对数据集的每一个元素,进行某种操作,这个操作动作,由accumulator方法提供,该方法返回一个BiConsumer<A, T>函数,此函数消费两个参数,不返回值;该函数中<A, T>两个泛型,其中 T 表示数据集中的元素,而 A 表示对元素操作过程中产生的中间值进行存储的临时变量,即上面说的累加器;
- 上一步中提到了对数据操作的过程中需要对计算结果进行临时存储,那就需要一个存储的容器,该容器由supplier方法提供,该方法返回一个Supplier函数,可产生一个类型为A的对象;
- 如果是并行归集的话,需要一个方法把各个子任务的归集结果进行合并,combiner方法派上用场;
- 最后,计算完成后,还要返回一个归集结果,finisher方法可得到一个Function函数,把累加器A转换成最终的响应结果R并返回;
看一下Stream.collect的源码:
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
// 声明一个存储中间值的容器变量(累加器)
A container;
// 如果是并行流,并且满足:
// [1. collector特性中包含CONCURRENT(并行操作);2. 流是无序的,或者collector特性中包含了UNORDERED(无需按顺序进行归集);],
//执行下面的代码进行并行归集,因为这种情况下,不需要专门提供一个将各个子任务进行合并的方法
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
// 首先从supplier方法中获得一个容器
container = collector.supplier().get();
// 拿到执行归集操作的BiConsumer函数
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
// 遍历集合,调用BiConsumer函数的accept方法,并且传入存储临时值的累加器和当前遍历到的元素,并行流中,forEach遍历是并行的
forEach(u -> accumulator.accept(container, u));
}
else {
// 不满足上述条件,就调用evaluate方法进行归集
// evaluate的细节先不说了,因为说来话长
container = evaluate(ReduceOps.makeRef(collector));
}
// 返回值,如果collector特性列表中包含了IDENTITY_FINISH,就返回容器自身,否则, collector.finisher()获得完成器,并传入累加器进行完成操作,最终的结果作为返回值
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
另外,Stream中的collect方法还有另一个重载方法:
public final <R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super P_OUT> accumulator,
BiConsumer<R, R> combiner) {
return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
}
省去了Collector,入参直接接受三个函数,正是上面讲到的三个函数,省去了对结果进行转换的finisher,而实现方法更是直接调用了evaluate,省去了是否并行流和Collector的特征判断;
下面看一下Collector的工具类Collectors中提供的几个常用实现;
其实实现只有一个:CollectorImpl
,结构也简单,构造器传入几个属性完事儿,方法实现中返回对应属性;
主要是看几个常用方法:
Collectors.toList()
public static <T> Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new,
List::add,
(left, right) -> {left.addAll(right); return left;},
CH_ID);
}
static final Set<Collector.Characteristics> CH_ID
= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
- ArrayList::new,创建一个ArrayList作为累加器;
- List::add,对流中元素的操作就是直接添加到累加器中;
- 如果是并行归集,对子任务归集结果进行全并的方法是 addAll,后一个子任务的结果直接全部添加到前一个子任务结果中;
- CH_ID是一个预定义好的特性列表,只有一个,IDENTITY_FINISH,表示累加器就是最终要返回的结果,不需要转换;
也可以看出Collectors.toList()归集器的归集结果是一个ArrayList,如果想转换为其它List的实现,还需要自己操作,或者定制一个归集器;
Collectors.toMap
public static <T, K, U>
Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
// 调用另一个toMap方法,最后一个参数表示supplier,生成累加器,一个HashMap
return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
}
public static <T, K, U, M extends Map<K, U>>
Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier) {
// 这个是要对元素进行的操作,调用俩个映射函数分别生成k和v,然后加入到累加器中
// map.merge:如果给定key没绑定值或值为null,则绑定给定值,否则,执行重映射方法替换原来值或者删除原来的值。
BiConsumer<M, T> accumulator
= (map, element) -> map.merge(keyMapper.apply(element),
valueMapper.apply(element), mergeFunction);
// 最后特性表仍然是CH_ID,直接返回累加器
return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
}
不多说,看注释就好
分组,这个比较复杂,是组合归集器:Collectors.groupingBy
// 传入一个映射函数,用于分成key,也就是我们的分组依据
public static <T, K> Collector<T, ?, Map<K, List<T>>>
groupingBy(Function<? super T, ? extends K> classifier) {
// 这里除了分组依据,还传入一个Collector,我们称之为子归集器,用于对分组后的数据进行归集
return groupingBy(classifier, toList());
}
public static <T, K, A, D>
Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream) {
// 这里HashMap::new就是supplier,用于生成累加器
return groupingBy(classifier, HashMap::new, downstream);
}
public static <T, K, D, A, M extends Map<K, D>>
Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream) {
// 子归集器的累加器
Supplier<A> downstreamSupplier = downstream.supplier();
// 子归集器要进行的操作
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
// 父归集器的操作m表示累加器,t表示要操作的数据
BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
// 把数据映射成Key
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
// 从累加器中取出子累加器,如果没有,用后面的函数生成一个并绑定
A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
// 子归集器对子累加器和数据进行操作
downstreamAccumulator.accept(container, t);
};
// 并行归集的合并函数
BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
@SuppressWarnings("unchecked")
// 累加器
Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
// 如果子归集器特性中包含IDENTITY_FINISH,默认情况下子归集器是一个toList()的结果,本身是IDENTITY_FINISH的
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
// 最终还是创建一个CollectorImpl
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
}
else {
// 子归集器的完成函数
@SuppressWarnings("unchecked")
Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
// 父归集器的完成函数
Function<Map<K, A>, M> finisher = intermediate -> {
intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
@SuppressWarnings("unchecked")
M castResult = (M) intermediate;
return castResult;
};
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
}
}
比较复杂,大概的过程,不过结合注释应该还是能看的懂;