import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import java.util.Arrays;import java.util.List;/** * cogroup(otherDataSet,[numTasks]) 算子: * 将两个RDD中的数据按照key进行汇总: * 第一个RDD按照key进行汇总,结果放在iterable中。 * 第二个RDD同样按照key进行汇总,结果放在另一个iterable中。 * 最后得到一个key和两个iterable的数据。 */public class CogroupOperator { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("cogroup"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String,String>> stus = Arrays.asList( new Tuple2<>("w1","z1"), new Tuple2<>("w2","z2"), new Tuple2<>("w3","z3"), new Tuple2<>("w2","z2"), new Tuple2<>("w1","z1") ); List<Tuple2<String,String>> scores = Arrays.asList( new Tuple2<>("w1","100"), new Tuple2<>("w2","10"), new Tuple2<>("w3","1"), new Tuple2<>("w2","90"), new Tuple2<>("w1","900") ); JavaPairRDD<String,String> stusrdd = sc.parallelizePairs(stus); JavaPairRDD<String,String> scorerdd = sc.parallelizePairs(scores); JavaPairRDD<String,Tuple2<Iterable<String>,Iterable<String>>> result = stusrdd.cogroup(scorerdd,2); result.foreach(new VoidFunction<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>() { @Override public void call(Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> tuple) throws Exception { System.err.println("key:"+tuple._1+",另外两个iterable中的数据:"+tuple._2); } }); }}
微信扫描下图二维码加入博主知识星球,获取更多大数据、人工智能、算法等免费学习资料哦!