Lambda表达式是Java8最重要的新特性,基础的内容这里就不说了,让我们从收集器开始。
什么是收集器
就是用于收集流运算后结果的角色。例如:
List<String> collect = list.stream().map(TestBean::getName).collect(Collectors.toList());
以上代码最后的Collectors.toList(),就是Java8原生的收集器,用于把流结果放到一个List中。
原生的收集器还有很多,大多定义在Collectors下的静态方法。
通过收集器的组合,能产生很复杂的收集效果,这里就不展开说了,有兴趣的可以去翻翻。
这里要说的是在原生收集器用的不趁手的时候,如何自定义一个收集器。
其实蛮简单的。
实现Collector接口就可以了。
拿一个例子来说吧,现在要定义把一个整数流中所有数求和的收集器。
比如对于流:1,2,3,4,5....
计算出来就是:1+2+3+....
好,现在来定义一个类,实现Collector接口:
Collector接口
public class LongSumCollector implements
Collector<Long, LongSumCollector, Long> {
...
}
Collector接口有三个范型定义:
第一个,定义流的数据类;
第二个,定义保存中间计算结果的类,这里我们写的是收集器自身,也可以定义其他计算类;
第三个,定义流收集器的最后结果类型,就是计算出来的最后结果是个什么类型;
Collector接口有多个方法要实现,结构如下:
public class LongSumCollector implements
Collector<Long, LongSumCollector, Long>{
@Override
public Supplier<LongSumCollector > supplier() {
return null;
}
@Override
public BiConsumer<LongSumCollector, Long> accumulator() {
return null;
}
@Override
public BinaryOperator<LongSumCollector> combiner() {
return null;
}
@Override
public Function<LongSumCollector, Long> finisher() {
return null;
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return null;
}
}
下面来挨个说明一下:
supplier()方法
对计算器的初始化,就是接口范型中间那个定义的计算器类型,方法返回一个创建计算器的lambda:
@Override
public Supplier<LongSumCollector> supplier() {
return () -> new LongSumCollector();
}
accumulator()方法
计算器对流中数据的处理,方法返回的也是lambda,第一个参数为计算器,第二个参数为流程中的数据:
@Override
public BiConsumer<LongSumCollector, Long> accumulator() {
return (collector, value) -> {
collector.sum += value;
};
}
代码中的sum用来记录求合结果
combiner()方法
用于在并发计算的情况下,对各路并发计算的结果进行合并,方法返回的lambda,两个参数就是进行合并的两路计算器,lambda要求最后返回合并的结果。
@Override
public BinaryOperator<LongSumCollector> combiner() {
return (c1, c2) -> {
c1.sum += c2.sum;
return c1;
};
}
finisher()方法
输出最后的结果:
@Override
public Function<LongSumCollector, Long> finisher() {
return (c1) -> c1.sum;
}
characteristics() 方法
用于标记收集器的特性,这里先返回一个空Set。
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return new HashSet<>();
}
好,现在收集器已经自定义完了,怎么用呢?
这里写一个测试代码
// 数据流列表
List<Long> list = new ArrayList<Long>(100 * 10000);
// 生成100W个数
LongStream.range(1, 100 * 10000).forEach((value) -> list.add(value));
// 用流进行数据计算
Long sum = list.stream().collect(new LongSumCollector());
最后collect方法返回的值就是求和的结果,这种方式是不是很优雅?
并行
=====
在流程计算下,并行的实现简直是有够简单粗暴。
只需要用parallelStream()方法返回的流进行的计算就是并行的:
Long sum = list.parallelStream().collect(new LongSumCollector());
为了展式并行和串行的差别,加以时间计算和结果输出进行对比:
// 数据流列表
List<Long> dataList = new ArrayList<Long>(1000 * 10000);
// 生成1000W个数
LongStream.range(1, 1000 * 10000).forEach(
(value) -> dataList.add(value));
System.out.println("start");
// 用串行流进行数据计算
long time1 = System.currentTimeMillis();
Long sum1 = dataList.stream().sequential()
.collect(new LongSumCollector());
time1 = System.currentTimeMillis() - time1;
System.out.println("stream :" + time1 + ":" + sum1);
// 用并行流进行数据计算
long time2 = System.currentTimeMillis();
Long sum2 = dataList.parallelStream().collect(new LongSumCollector());
time2 = System.currentTimeMillis() - time2;
System.out.println("parallelStream:" + time2 + ":" + sum2);
System.out.println(time2 / (time1 * 1.0));
打印的结果如下:
stream :68:49999995000000
parallelStream:121:49999995000000
1.7794117647058822
什么情况,并行居然是串行的近2倍的时间!
好吧,测试用机只有两核,所以并行的效率发挥不出来。