Spark scala和java的api使用

Stella981
• 阅读 764

1、利用scala语言开发spark的worcount程序(本地运行)

package com.zy.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//todo:利用scala语言来实现spark的wordcount程序
object WordCount {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象,设置appName和master  local[2]表示本地采用2个线程去运行任务
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")

    //2、创建SparkContext 该对象是所有spark程序的执行入口,它会创建DAGScheduler和TaskScheduler
    val sc = new SparkContext(sparkConf)

    //设置日志输出级别
    sc.setLogLevel("warn")

    //3、读取数据文件
    val data: RDD[String] = sc.textFile("D:\\words.txt")

    //4、切分每一行获取所有单词
    val words: RDD[String] = data.flatMap(_.split(" "))

    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //6、相同单词出现的所有的1累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //按照单词出现的次数降序排列
    val sortRDD: RDD[(String, Int)] = result.sortBy(x => x._2, false)


    //7、收集数据,打印输出
    val finalResult: Array[(String, Int)] = sortRDD.collect()
    finalResult.foreach(println)

    //8、关闭sc
    sc.stop()
  }
}

2、利用scala语言开发spark的wordcount程序(集群运行)

package com.zy.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//todo:利用scala语言开发spark的wordcount程序(集群运行)
object WordCount_Online {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象,设置appName
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount_Online")

    //2、创建SparkContext 该对象是所有spark程序的执行入口,它会创建DAGScheduler和TaskScheduler
    val sc = new SparkContext(sparkConf)

    //设置日志输出级别
    sc.setLogLevel("warn")

    //3、读取数据文件 args(0)为文件地址参数
    val data: RDD[String] = sc.textFile(args(0))

    //4、切分每一行获取所有单词
    val words: RDD[String] = data.flatMap(_.split(" "))

    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //6、相同单词出现的所有的1累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //7、把结果数据保存到hdfs上  args(1)是保存到hdfs的目录参数
    result.saveAsTextFile(args(1))

    //8、关闭sc
    sc.stop()
  }

}

最后打成jar包 到集群上执行

spark-submit --master spark://node1:7077 --class cn.itcast.spark.WordCount_Online --executor-memory 1g --total-executor-cores 2 original-spark_xxx-1.0-SNAPSHOT.jar /words.txt /out

3、利用java语言开发spark的wordcount程序(本地运行)

package com.zy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

//todo:利用java语言开发spark的wordcount程序(本地运行)
public class WordCount_Java {
    public static void main(String[] args) {
        //1、创建SparkConf对象
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");

        //2、创建JavaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //3、读取数据文件
        JavaRDD<String> data = jsc.textFile("D:\\words.txt");

        //4、切分每一行获取所有的单词
        JavaRDD<String> words = data.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String line) throws Exception {
                String[] words = line.split(" ");
                return Arrays.asList(words).iterator();
            }
        });

        //5、每个单词计为1
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        //6、相同单词出现1累加
        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //按照单词出现的次数降序排列 (单词,次数)------>(次数,单词).sortByKey------->(单词,次数)

        JavaPairRDD<Integer, String> reverseRDD = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<Integer, String>(t._2, t._1);
            }
        });

        JavaPairRDD<String, Integer> sortedRDD = reverseRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<String, Integer>(t._2, t._1);
            }
        });


        //7、收集数据打印输出
        List<Tuple2<String, Integer>> finalResult = sortedRDD.collect();
        for (Tuple2<String, Integer> tuple : finalResult) {
            System.out.println("单词:" + tuple._1 + " 次数:" + tuple._2);
        }

        //8、关闭jsc
        jsc.stop();
    }
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
ES6 新增的数组的方法
给定一个数组letlist\//wu:武力zhi:智力{id:1,name:'张飞',wu:97,zhi:10},{id:2,name:'诸葛亮',wu:55,zhi:99},{id:3,name:'赵云',wu:97,zhi:66},{id:4,na
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这