第1章 RDD 概念
1.1 RDD 为什么会产生
RDD:Resilient Distributed Dataset 弹性分布式数据集
RDD 是 Spark 的基石,是实现 Spark 数据处理的核心抽象。那么 RDD 为什么会产生呢?
Hadoop 的 MapReduce 是一种基于数据集的工作模式,面向数据,这种工作模式一般是从存储上加载数据集,然后操作数据集,最后写入物理存储设备。数据更多面临的是一次性处理。
MR 的这种方式对数据领域两种常见的操作不是很高效。第一种是迭代式的算法
。比如机器学习中 ALS、凸优化梯度下降等。这些都需要基于数据集或者数据集的衍生数据反复查询反复操作。MR 这种模式不太合适,即使多 MR 串行处理,性能和时间也是一个问题。数据的共享依赖于磁盘。另外一种是交互式数据挖掘
,MR 显然不擅长。MR 中的迭代:
Spark中的迭代:
我们需要一个效率非常快,且能够支持迭代计算和有效数据共享的模型,Spark 应运而生。RDD 是基于工作集的工作模式,更多的是面向工作流。
但是无论是 MR 还是 RDD 都应该具有类似位置感知、容错和负载均衡等特性。
1.2 RDD 概述
1.2.1 什么是 RDD
RDD(Resilient Distributed Dataset)叫做分布式数据集
,是 Spark 中最基本的数据抽象,它代表一个不可变
、可分区(分片)
、里面的元素可并行计算的集合(弹性)
。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有 RDD 以及调用 RDD 操作进行求值。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。RDD 具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD 允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的 RDD 的操作,比如 map() 和 filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作。比如 count() 和 first()。
Spark 采用惰性计算
模式,RDD 只有第一次在一个行动操作
中用到时,才会真正计算。Spark 可以优化整个计算过程。默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。
1.2.2 RDD 的属性
1) 一组分片(Partition),即
数据集的基本组成单位
。对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的 CPU Core 的数目。
2) 一个计算每个分区的函数。Spark 中 RDD 的计算是以分片为单位的
,每个 RDD 都会实现 compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。
3) RDD 之间的依赖关系。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。
4) 一个 Partitioner,即 RDD 的分片函数。当 前Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于 key-value 的 RDD,才会有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决定了Parent RDD Shuffle 输出时的分片数量。
5) 一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
RDD 是一个应用层面的逻辑概念。一个 RDD 多个分片。RDD 就是一个元数据记录集,记录了 RDD 内存所有的关系数据。
1.3 RDD 弹性
1) 自动进行内存和磁盘数据存储的切换
Spark 优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换。
2) 基于血统的高效容错机制
在 RDD 进行转换和动作的时候,会形成 RDD 的 Lineage 依赖链,当某一个 RDD 失效的时候,可以通过重新计算上游的 RDD 来重新生成丢失的 RDD 数据。
3) Task 如果失败会自动进行特定次数的重试
RDD 的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是 4 次。
4) Stage 如果失败会自动进行特定次数的重试
如果 Job 的某个 Stage 阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是 4 次。
5) Checkpoint 和 Persist 可主动或被动触发
RDD 可以通过 Persist 持久化将 RDD 缓存到内存或者磁盘,当再次用到该 RDD 时直接读取就行。也可以将 RDD 进行检查点,检查点会将数据存储在 HDFS 中,该 RDD 的所有父 RDD 依赖都会被移除。
6) 数据调度弹性
Spark 把这个 JOB 执行模型抽象为通用的有向无环图 DAG,可以将多 Stage 的任务串联或并行执行,调度引擎自动处理 Stage 的失败以及 Task 的失败。
7) 数据分片的高度弹性
可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。RDD 全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他 RDD 转换而创建,为此,RDD 支持丰富的转换操作(如 map, join, filter, groupby 等),通过这种转换操作,新的 RDD 则包含了如何从其他 RDDs 衍生所必需的信息,所以说 RDDs 之间是有依赖关系的。基于 RDDs 之间的依赖,RDDs 会形成一个有向无环图 DAG,该 DAG 描述了整个流式计算的流程,实际执行的时候,RDD 是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于 RDD 的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的 DAG,然后写回稳定存储。另外 RDD 还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说 Spark 最初也就是实现 RDD 的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD 的关系类似于 Hadoop-MapReduce 关系。
1.4 RDD 特点
RDD 表示只读的分区的数据集,对 RDD 进行改动,只能通过 RDD 的转换操作,由一个 RDD 得到一个新的 RDD,新的 RDD 包含了从其他 RDD 衍生所必需的信息。RDDs 之间存在依赖,RDD 的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化 RDD 来切断血缘关系。
1.4.1 分区
RDD 逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个 compute 函数得到每个分区的数据。如果 RDD 是通过已有的文件系统构建,则 compute 函数是读取指定文件系统中的数据,如果 RDD 是通过其他 RDD 转换而来,则 compute 函数是执行转换逻辑将其他 RDD 的数据进行转换。
1.4.2 只读
如下图所示,RDD 是只读的,要想改变 RDD 中的数据,只能在现有的 RDD 基础上创建新的 RDD。
由一个 RDD 转换到另一个 RDD,可以通过丰富的操作算子实现,不再像 MapReduce 那样只能写 map 和 reduce 了,如下图所示。
RDD 的操作算子包括两类,一类叫做 transformations,它是用来将 RDD 进行转化,构建 RDD 的血缘关系;另一类叫做 actions,它是用来触发 RDD 的计算,得到 RDD 的相关计算结果或者将 RDD 保存的文件系统中。下图是 RDD 所支持的操作算子列表。
1.4.3 依赖
RDDs 通过操作算子进行转换,转换得到的新 RDD 包含了从其他 RDDs 衍生所必需的信息,RDDs 之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs 之间分区是一一对应的,另一种是宽依赖,下游 RDD 的每个分区与上游 RDD (也称之为父 RDD)的每个分区都有关,是多对多的关系。
通过 RDDs 之间的这种依赖关系,一个任务流可以描述为 DAG (有向无环图),如下图所示,在实际执行过程中宽依赖对应于 Shuffle (图中的 reduceByKey 和 join),窄依赖中的所有转换操作可以通过类似于管道的方式一气呵成执行(图中 map 和 union 可以一起执行)。
1.4.4 缓存
如果在应用程序中多次使用同一个 RDD,可以将该 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该 RDD 的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1 经过一系列的转换后得到 RDD-n 并保存到 hdfs,RDD-1 在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的 RDD-1 转换到 RDD-m 这一过程中,就不会计算其之前的 RDD-0 了。
1.4.5 CheckPoint
虽然 RDD 的血缘关系天然地可以实现容错,当 RDD 的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs 之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD 支持 checkpoint 将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为 checkpoint 后的 RDD 不需要知道它的父 RDDs 了,它可以从 checkpoint 处拿到数据。
给定一个 RDD 我们至少可以知道如下几点信息:
1、分区数以及分区方式;
2、由父 RDDs 衍生而来的相关依赖信息;
3、计算每个分区的数据,计算步骤为:
1)如果被缓存,则从缓存中取的分区的数据;
2)如果被 checkpoint,则从 checkpoint 处恢复数据;
3)根据血缘关系计算分区的数据。
第2章 RDD 编程
2.1 RDD 编程模型
在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换。经过一系列的 transformations 定义 RDD 之后,就可以调用 actions 触发 RDD 的计算,action 可以是向应用程序返回结果(count, collect 等),或者是向存储系统保存数据(saveAsTextFile 等)。在 Spark 中,只有遇到 action,才会执行 RDD 的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用 Spark,开发者需要编写一个 Driver 程序,它被提交到集群以调度运行 Worker,如下图所示。Driver 中定义了一个或多个 RDD,并调用 RDD 上的 action,Worker 则执行 RDD 分区计算任务。
Driver 和 Worker 内部示意图:
2.2 RDD 创建
在 Spark 中创建 RDD 的创建方式大概可以分为三种:
1.从集合中创建 RDD;
2.从外部存储创建 RDD;
3.从其他 RDD 创建。
2.3 RDD 编程
RDD 一般分为数值 RDD 和键值对 RDD,本章不进行具体区分,先统一来看,下一章会对键值对 RDD 做专门说明。
2.3.1 Transformation(转换)
RDD 中的所有转换都是延迟加载的
,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给 Driver
的动作时,这些转换才会真正运行。这种设计让 Spark 更加有效率地运行。
常用的 Transformation 如下:
/** * map算子案例:将集合中每一个元素都乘以2 */
private static void map(){ //创建SparkConf SparkConf conf=new SparkConf().setAppName("map").setMaster("local"); // 创建JavaSparkContext JavaSparkContext sparkContext=new JavaSparkContext(conf); // 构造集合 List<Integer> numbers= Arrays.asList(1,2,3,4,5,6,7); // 并行化集合,创建初始RDD JavaRDD<Integer> numberRDD=sparkContext.parallelize(numbers); // 使用map算子,将集合中的每个元素都乘以2 // map算子,是对任何类型的RDD,都可以调用的 // 在java中,map算子接收的参数是Function对象 // 创建的Function对象,一定会让你设置第二个泛型参数,这个泛型类型,就是返回的新元素的类型 // 同时call()方法的返回类型,也必须与第二个泛型类型同步 // 在call()方法内部,就可以对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素 // 所有新的元素就会组成一个新的RDD JavaRDD<Integer> doubleNumberRDD=numberRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer integer) throws Exception { return integer*3; } }); // 打印新的RDD doubleNumberRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer) throws Exception { System.out.println(integer); } }); // 关闭JavaSparkContext sparkContext.close(); }
/** * filter算子案例:过滤集合中的偶数 */
private static void filter() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("filter") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // 并行化集合,创建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); // 对初始RDD执行filter算子,过滤出其中的偶数 // filter算子,传入的也是Function,其他的使用注意点,实际上和map是一样的 // 但是,唯一的不同,就是call()方法的返回类型是Boolean // 每一个初始RDD中的元素,都会传入call()方法,此时你可以执行各种自定义的计算逻辑 // 来判断这个元素是否是你想要的 // 如果你想在新的RDD中保留这个元素,那么就返回true;否则,不想保留这个元素,返回false JavaRDD<Integer> evenNumberRDD=numberRDD.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer integer) throws Exception { return integer%2==0; } }); //打印 evenNumberRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer) throws Exception { System.out.println(integer); } }); // 关闭JavaSparkContext sc.close(); }
/** * flatMap案例:将文本行拆分为多个单词 */
private static void flatMap() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("flatMap") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 构造集合 List<String> lineList = Arrays.asList("hello you", "hello me", "hello world"); // 并行化集合,创建RDD JavaRDD<String> lines = sc.parallelize(lineList); // 对RDD执行flatMap算子,将每一行文本,拆分为多个单词 // flatMap算子,在java中,接收的参数是FlatMapFunction // 我们需要自己定义FlatMapFunction的第二个泛型类型,即,代表了返回的新元素的类型 // call()方法,返回的类型,不是U,而是Iterable<U>,这里的U也与第二个泛型类型相同 // flatMap其实就是,接收原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回多个元素 // 多个元素,即封装在Iterable集合中,可以使用ArrayList等集合 // 新的RDD中,即封装了所有的新元素;也就是说,新的RDD的大小一定是 >= 原始RDD的大小 JavaRDD<String> words=lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); words.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); // 关闭JavaSparkContext sc.close(); }
/** * groupByKey案例:按照班级对成绩进行分组 */
private static void groupByKey() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("groupByKey") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90), new Tuple2<String, Integer>("class2", 65)); // 并行化集合,创建JavaPairRDD JavaPairRDD<String,Integer> scores=sc.parallelizePairs(scoreList); // 针对scores RDD,执行groupByKey算子,对每个班级的成绩进行分组 // groupByKey算子,返回的还是JavaPairRDD // 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型 // 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable // 那么接下来,我们是不是就可以通过groupedScores这种JavaPairRDD,很方便地处理某个分组内的数据 JavaPairRDD<String,Iterable<Integer>> groupedScores=scores.groupByKey(); // 打印groupedScores RDD groupedScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { @Override public void call(Tuple2<String, Iterable<Integer>> s) throws Exception { System.out.println("=========================="); System.out.println(s._1); Iterator<Integer> ite=s._2.iterator(); while (ite.hasNext()){ System.out.println(ite.next()); } } }); // 关闭JavaSparkContext sc.close(); }
/** * reduceByKey案例:统计每个班级的总分 */
private static void reduceByKey() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("reduceByKey") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90), new Tuple2<String, Integer>("class2", 65)); // 并行化集合,创建JavaPairRDD JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); // 针对scores RDD,执行reduceByKey算子 // reduceByKey,接收的参数是Function2类型,它有三个泛型参数,实际上代表了三个值 // 第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型 // 因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入 // 因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型 // 第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的 // reduceByKey算法返回的RDD,还是JavaPairRDD<key, value> JavaPairRDD<String,Integer> totalScores=scores.reduceByKey(new Function2<Integer, Integer, Integer>() { // 对每个key,都会将其value,依次传入call方法 // 从而聚合出每个key对应的一个value // 然后,将每个key对应的一个value,组合成一个Tuple2,作为新RDD的元素 @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); // 打印totalScores RDD totalScores.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println(s._1+":"+s._2); } }); // 关闭JavaSparkContext sc.close(); }
reduceByKey 和 groupByKey 的区别:
/** * sortByKey案例:按照学生分数进行排序 */
private static void sortByKey() {
// 创建SparkConf SparkConf conf = new SparkConf() .setAppName("sortByKey") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<Integer, String>> scoreList = Arrays.asList( new Tuple2<Integer, String>(65, "leo"), new Tuple2<Integer, String>(50, "tom"), new Tuple2<Integer, String>(100, "marry"), new Tuple2<Integer, String>(80, "jack")); // 并行化集合,创建RDD JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList); // 对scores RDD执行sortByKey算子 // sortByKey其实就是根据key进行排序,可以手动指定升序,或者降序 // 返回的,还是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一样的 // 但是就是RDD中的元素的顺序,不同了 JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(true); // 打印sortedScored RDD sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<Integer, String> t) throws Exception { System.out.println(t._1 + ": " + t._2); } }); // 关闭JavaSparkContext sc.close(); }
/** * join案例:打印学生成绩 */
private static void join() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("join") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "leo"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90), new Tuple2<Integer, Integer>(3, 60)); //并行化两个rdd JavaPairRDD<Integer, String> students=sc.parallelizePairs(studentList); JavaPairRDD<Integer, Integer> scores=sc.parallelizePairs(scoreList); JavaPairRDD<Integer,Tuple2<String,Integer>> studentScores=students.join(scores); studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() { @Override public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception { System.out.println("==============================="); System.out.println("class:"+t._1); System.out.println("name:"+t._2._1); System.out.println("name:"+t._2._2); } }); // 关闭JavaSparkContext sc.close(); }
/** * cogroup案例:打印学生成绩 */
private static void cogroup() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("cogroup") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "leo"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90), new Tuple2<Integer, Integer>(3, 60), new Tuple2<Integer, Integer>(1, 70), new Tuple2<Integer, Integer>(2, 80), new Tuple2<Integer, Integer>(3, 50)); // 并行化两个RDD JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList); JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList); //cogroup与join不同 // 相当于是,一个key join上的所有value,都给放到一个Iterable里面去了 // cogroup,不太好讲解,希望大家通过动手编写我们的案例,仔细体会其中的奥妙 JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> studentScores=students.cogroup(scores); studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() { @Override public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception { System.out.println("student id: " + t._1); System.out.println("student_name:"+t._2._1); System.out.println("student score: " + t._2._2); System.out.println("==============================="); } }); }
2.3.2 Action(行动)
常用的 Action 如下:
1.reduce
private static void reduce() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("reduce") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 使用reduce操作对集合中的数字进行累加 // reduce操作的原理: // 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3 // 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6 // 以此类推 // 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素 int sum=numbers.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); System.out.println(sum); // 关闭JavaSparkContext sc.close();}
2.collect
private static void collect() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("collect") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 使用map操作将集合中所有数字乘以2 JavaRDD<Integer> doubleNumbers=numbers.map(new Function<Integer, Integer>() { @Override public Integer call(Integer integer) throws Exception { return integer*2; } }); // 不用foreach action操作,在远程集群上遍历rdd中的元素 // 而使用collect操作,将分布在远程集群上的doubleNumbers RDD的数据拉取到本地 // 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条 // 那么性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地 // 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出 // 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理 List<Integer> doubleNumberList = doubleNumbers.collect(); for(Integer num : doubleNumberList) { System.out.println(num); } // 关闭JavaSparkContext sc.close();}
3.count
private static void count() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("count") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 对rdd使用count操作,统计它有多少个元素 long count = numbers.count(); System.out.println(count); // 关闭JavaSparkContext sc.close();}
4.take
private static void take() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("take") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 对rdd使用count操作,统计它有多少个元素 // take操作,与collect类似,也是从远程集群上,获取rdd的数据 // 但是collect是获取rdd的所有数据,take只是获取前n个数据 List<Integer> top3Numbers = numbers.take(3); for(Integer num : top3Numbers) { System.out.println(num); } // 关闭JavaSparkContext sc.close();}
5.saveAsTextFile
private static void saveAsTextFile() { // 创建SparkConf和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("saveAsTextFile"); JavaSparkContext sc = new JavaSparkContext(conf); // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加 List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numbers = sc.parallelize(numberList); // 使用map操作将集合中所有数字乘以2 JavaRDD<Integer> doubleNumbers = numbers.map( new Function<Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); // 直接将rdd中的数据,保存在HFDS文件中 // 但是要注意,我们这里只能指定文件夹,也就是目录 // 那么实际上,会保存为目录中的/double_number.txt/part-00000文件 doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt"); // 关闭JavaSparkContext sc.close();}
7.countByKey
@SuppressWarnings("unchecked")private static void countByKey() { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("countByKey") .setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 模拟集合 List<Tuple2<String, String>> scoreList = Arrays.asList( new Tuple2<String, String>("class1", "leo"), new Tuple2<String, String>("class2", "jack"), new Tuple2<String, String>("class1", "marry"), new Tuple2<String, String>("class2", "tom"), new Tuple2<String, String>("class2", "david")); // 并行化集合,创建JavaPairRDD JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList); // 对rdd应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数 // 这就是countByKey的作用 // countByKey返回的类型,直接就是Map<String, Object> Map<String, Long> studentCounts = students.countByKey(); for(Map.Entry<String, Long> studentCount : studentCounts.entrySet()) { System.out.println(studentCount.getKey() + ": " + studentCount.getValue()); } // 关闭JavaSparkContext sc.close();}