springboot基于spark

Easter79
• 阅读 1122

参考文章:使用springboot构建rest api远程提交spark任务

spark-submit动态提交的办法(SparkLauncher实战)

用java提交一个Spark应用程序

Spark-利用SparkLauncher 类以JAVA API 编程的方式提交spark job--impt

官网API参考: http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

github代码链接:github地址

1. spark集群及版本信息
服务器版本:centos7
hadoop版本:2.8.3
spark版本:2.3.3
使用springboot构建rest api远程提交spark任务,将数据库中的表数据存储到hdfs上,任务单独起一个项目,解除与springboot项目的耦合

2. 构建springboot项目
1. pom配置
    
        <java.version>1.8</java.version>
        <spark.version>2.3.3</spark.version>
        <scala.version>2.11</scala.version>
   

   
       
            org.springframework.boot
            spring-boot-starter-web
       

       
            org.springframework.boot
            spring-boot-starter
       

       
            mysql
            mysql-connector-java
            5.1.46
       

       
            org.apache.spark
            spark-launcher_${scala.version}
            ${spark.version}
       

       
            org.projectlombok
            lombok
       

       
            com.alibaba
            fastjson
            1.2.49
       

       
            org.springframework.boot
            spring-boot-starter-test
            test
       

   

   
        spark
       
           
                org.springframework.boot
                spring-boot-maven-plugin
               
                    com.hrong.springbootspark.SpringbootSparkApplication
               

               
                   
                       
                            repackage
                       

                   

               

           

       

   

2. 项目结构

3. 编写代码
1. 创建spark任务实体
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

/**
 * @Author hrong
 **/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SparkApplicationParam {
    /**
     * 任务的主类
     */
    private String mainClass;
    /**
     * jar包路径
     */
    private String jarPath;
    @Value("${spark.master:yarn}")
    private String master;
    @Value("${spark.deploy.mode:cluster}")
    private String deployMode;
    @Value("${spark.driver.memory:1g}")
    private String driverMemory;
    @Value("${spark.executor.memory:1g}")
    private String executorMemory;
    @Value("${spark.executor.cores:1}")
    private String executorCores;
    /**
     * 其他配置:传递给spark job的参数
     */
    private Map<String, String> otherConfParams;

    /**
     * 调用该方法可获取spark任务的设置参数
     * @return SparkApplicationParam
     */
    public SparkApplicationParam getSparkApplicationParam(){
        return new SparkApplicationParam(mainClass, jarPath, master, deployMode, driverMemory, executorMemory, executorCores, otherConfParams);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2. 任务参数对象
每个任务执行的时候都必须指定运行参数,所以要继承SparkApplicationParam对象

import com.hrong.springbootspark.entity.SparkApplicationParam;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @Author hrong
 **/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DataBaseExtractorVo extends SparkApplicationParam {
    /**
     * 数据库连接地址
     */
    private String url;
    /**
     * 数据库连接账号
     */
    private String userName;
    /**
     * 数据库密码
     */
    private String password;
    /**
     * 指定的表名
     */
    private String table;
    /**
     * 目标文件类型
     */
    private String targetFileType;
    /**
     * 目标文件保存路径
     */
    private String targetFilePath;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
3. 定义spark提交方法
1. 定义interface
每个spark任务运行时都需要指定运行参数,但是任务内部所需的参数不一样,所以第一个参数为通用的参数对象,第二个参数为可变参数,根据不同的任务来进行传值

import com.hrong.springbootspark.entity.SparkApplicationParam;

import java.io.IOException;

/**
 * @Author hrong
 * @description spark任务提交service
 **/
public interface ISparkSubmitService {
    /**
     * 提交spark任务入口
     * @param sparkAppParams spark任务运行所需参数
     * @param otherParams 单独的job所需参数
     * @return 结果
     * @throws IOException          io
     * @throws InterruptedException 线程等待中断异常
     */
    String submitApplication(SparkApplicationParam sparkAppParams, String... otherParams) throws IOException, InterruptedException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2. 具体实现
import com.alibaba.fastjson.JSONObject;
import com.hrong.springbootspark.entity.SparkApplicationParam;
import com.hrong.springbootspark.service.ISparkSubmitService;
import com.hrong.springbootspark.util.HttpUtil;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

/**
 * @Author hrong
 **/
@Service
public class SparkSubmitServiceImpl implements ISparkSubmitService {

    private static Logger log = LoggerFactory.getLogger(SparkSubmitServiceImpl.class);

    @Value("${driver.name:n151}")
    private String driverName;

    @Override
    public String submitApplication(SparkApplicationParam sparkAppParams, String... otherParams) throws IOException, InterruptedException {
        log.info("spark任务传入参数:{}", sparkAppParams.toString());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Map<String, String> confParams = sparkAppParams.getOtherConfParams();
        SparkLauncher launcher = new SparkLauncher()
                .setAppResource(sparkAppParams.getJarPath())
                .setMainClass(sparkAppParams.getMainClass())
                .setMaster(sparkAppParams.getMaster())
                .setDeployMode(sparkAppParams.getDeployMode())
                .setConf("spark.driver.memory", sparkAppParams.getDriverMemory())
                .setConf("spark.executor.memory", sparkAppParams.getExecutorMemory())
                .setConf("spark.executor.cores", sparkAppParams.getExecutorCores());
        if (confParams != null && confParams.size() != 0) {
            log.info("开始设置spark job运行参数:{}", JSONObject.toJSONString(confParams));
            for (Map.Entry<String, String> conf : confParams.entrySet()) {
                log.info("{}:{}", conf.getKey(), conf.getValue());
                launcher.setConf(conf.getKey(), conf.getValue());
            }
        }
        if (otherParams.length != 0) {
            log.info("开始设置spark job参数:{}", otherParams);
            launcher.addAppArgs(otherParams);
        }
        log.info("参数设置完成,开始提交spark任务");
        SparkAppHandle handle = launcher.setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                    @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        if (sparkAppHandle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                        log.info("stateChanged:{}", sparkAppHandle.getState().toString());
                    }

                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                        log.info("infoChanged:{}", sparkAppHandle.getState().toString());
                    }
                });
        log.info("The task is executing, please wait ....");
        //线程等待任务结束
        countDownLatch.await();
        log.info("The task is finished!");
        //通过Spark原生的监测api获取执行结果信息,需要在spark-default.xml、spark-env.sh、yarn-site.xml进行相应的配置
        String estUrl = "http://"+driverName+":18080/api/v1/applications/" + handle.getAppId();
        return HttpUtil.httpGet(restUrl, null);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
4. Controller写法
controller主要的职责就是接受页面的参数,将参数传递到service层

import com.hrong.springbootspark.service.ISparkSubmitService;
import com.hrong.springbootspark.vo.DataBaseExtractorVo;
import com.hrong.springbootspark.vo.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import java.io.IOException;

/**
 * @Author hrong
 **/
@Slf4j
@Controller
public class SparkController {
    @Resource
    private ISparkSubmitService iSparkSubmitService;
    /**
     * 调用service进行远程提交spark任务
     * @param vo 页面参数
     * @return 执行结果
     */
    @ResponseBody
    @PostMapping("/extract/database")
    public Object dbExtractAndLoad2Hdfs(@RequestBody DataBaseExtractorVo vo){
        try {
            return iSparkSubmitService.submitApplication(vo.getSparkApplicationParam(),
                    vo.getUrl(),
                    vo.getTable(),
                    vo.getUserName(),
                    vo.getPassword(),
                    vo.getTargetFileType(),
                    vo.getTargetFilePath());
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            log.error("执行出错:{}", e.getMessage());
            return Result.err(500, e.getMessage());
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
3. 构建Spark任务项目(Maven项目)
1. pom配置

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <hadoop.version>2.8.3</hadoop.version>
        <spark.version>2.3.3</spark.version>
        <scala.version>2.11</scala.version>
        <scala-library.version>2.11.8</scala-library.version>
        <mysql.version>5.1.46</mysql.version>
        <oracle.version>11g</oracle.version>
        <codehaus.version>3.0.10</codehaus.version>
   

   
       
            mysql
            mysql-connector-java
            ${mysql.version}
       

       
       
            com.oracle.driver
            jdbc-driver
            ${oracle.version}
       

       
       
            org.apache.spark
            spark-core_${scala.version}
            ${spark.version}
            provided
       

       
            org.apache.spark
            spark-sql_${scala.version}
            ${spark.version}
            provided
       

       
            org.codehaus.janino
            commons-compiler
            ${codehaus.version}
       

       
            org.scala-lang
            scala-library
            ${scala-library.version}
            provided
       

       
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            provided
       

   

   
        spark-job
       
           
               
                    net.alchim31.maven
                    scala-maven-plugin
                    3.2.2
               

           

       

       
           
                net.alchim31.maven
                scala-maven-plugin
               
                   
                        scala-compile-first
                        process-resources
                       
                            add-source
                            compile
                       

                   

                   
                        scala-test-compile
                        process-test-resources
                       
                            testCompile
                       

                   

               

           

           
                org.apache.maven.plugins
                maven-compiler-plugin
               
                   
                        compile
                       
                            compile
                       

                   

               

           

           
                org.apache.maven.plugins
                maven-shade-plugin
                2.4.3
               
                   
                        package
                       
                            shade
                       

                       
                           
                               
                                    *:*
                                   
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                   

                               

                           

                       

                   

               

           

       

   

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
2. 项目结构

3. spark job代码
获取外部参数,连接数据库,并将指定表中的数据根据指定的格式、目录转存到hdfs上

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Author hrong
 * @Description 将数据库中的表数据保存到hdfs上
 **/
public class DbTableEtl {
    private static Logger log = LoggerFactory.getLogger(DbTableEtl.class);

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName(DbTableEtl.class.getSimpleName())
                .getOrCreate();
        String url = args[0];
        String dbtable = args[1];
        String user = args[2];
        String password = args[3];
        String targetFileType = args[4];
        String targetFilePath = args[5];
        Dataset dbData = spark.read()
                .format("jdbc")
                .option("url", url)
                .option("dbtable", dbtable)
                .option("user", user)
                .option("password", password)
                .load();
        log.info("展示部分样例数据,即将开始导入到hdfs");
        dbData.show(20, false);
        dbData.write().mode("overwrite").format(targetFileType).save(targetFilePath);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
3. 项目打包
直接使用IDEA自带打包功能

1. springboot项目

2. Spark job项目

4. 上传至服务器

5. 将spark-job上传至hdfs

6. 启动springboot项目

7. 使用postman调用接口
指定jarPath、mainClass、deployMode以及任务所需参数

8. 调用结果
程序开始提交任务

程序执行结束

代码放在了github上面,链接:github地址

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法
Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法参考文章:(1)Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.codeprj.com%2Fblo
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k