Spark SQL重点知识总结

Stella981
• 阅读 903

一、Spark SQL的概念理解

Spark SQL是spark套件中一个模板,它将数据的计算任务通过SQL的形式转换成了RDD的计算,类似于Hive通过SQL的形式将数据的计算任务转换成了MapReduce。

Spark SQL的特点:
1、和Spark Core的无缝集成,可以在写整个RDD应用的时候,配置Spark SQL来完成逻辑实现。
2、统一的数据访问方式,Spark SQL提供标准化的SQL查询。
3、Hive的继承,Spark SQL通过内嵌的hive或者连接外部已经部署好的hive案例,实现了对hive语法的继承和操作。
4、标准化的连接方式,Spark SQL可以通过启动thrift Server来支持JDBC、ODBC的访问,将自己作为一个BI Server使用

Spark SQL数据抽象:
1、RDD(Spark1.0)->DataFrame(Spark1.3)->DataSet(Spark1.6)
2、Spark SQL提供了DataFrame和DataSet的数据抽象
3、DataFrame就是RDD+Schema,可以认为是一张二维表格,劣势在于编译器不进行表格中的字段的类型检查,在运行期进行检查
4、DataSet是Spark最新的数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame.DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型
5、DataFrame=DataSet[Row]
6、DataFrame和DataSet都有可控的内存管理机制,所有数据都保存在非堆上,都使用了catalyst进行SQL的优化。

Spark SQL客户端查询:
1、可以通过Spark-shell来操作Spark SQL,spark作为SparkSession的变量名,sc作为SparkContext的变量名
2、可以通过Spark提供的方法读取json文件,将json文件转换成DataFrame
3、可以通过DataFrame提供的API来操作DataFrame里面的数据。
4、可以通过将DataFrame注册成为一个临时表的方式,来通过Spark.sql方法运行标准的SQL语句来查询。

二、Spark SQL查询方式

DataFrame查询方式

1、DataFrame支持两种查询方式:一种是DSL风格,另外一种是SQL风格
(1)、DSL风格:
需要引入import spark.implicit._这个隐式转换,可以将DataFrame隐式转换成RDD
(2)、SQL风格:
a、需要将DataFrame注册成一张表格,如果通过CreateTempView这种方式来创建,那么该表格Session有效,如果通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀global_temp
b、需要通过sparkSession.sql方法来运行你的SQL语句

DataSet查询方式

定义一个DataSet,先定义一个Case类

三、DataFrame、Dataset和RDD互操作

1、RDD->DataFrame:

  • 普通方式:例如rdd.map(para(para(0).trim(),para(1).trim().toInt)).toDF("name","age")

  • 通过反射来设置schema,例如:

    #通过反射设置schema,数据集是spark自带的people.txt,路径在下面的代码中

Spark SQL重点知识总结

#注册成一张临时表

Spark SQL重点知识总结

这时teen是一张表,每一行是一个row对象,如果需要访问Row对象中的每一个元素,可以通过下标 row(0);你也可以通过列名 row.getAs[String]("name")

Spark SQL重点知识总结 也可以使用getAs方法:

Spark SQL重点知识总结

3、通过编程的方式来设置schema,适用于编译器不能确定列的情况

val peopleRDD=spark.sparkContext.textFile("file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt")

Spark SQL重点知识总结

Spark SQL重点知识总结

Spark SQL重点知识总结

2、DataFrame->RDD:

dataFrame.rdd

3、RDD->DataSet:

rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS

4、DataSet->DataSet:


dataSet.rdd

5、DataFrame -> DataSet:

dataFrame.to[Person]

6、DataSet -> DataFrame:

dataSet.toDF

四、用户自定义函数

1、用户自定义UDF函数

通过spark.udf功能用户可以自定义函数
自定义udf函数:
1、  通过spark.udf.register(name,func)来注册一个UDF函数,name是UDF调用时的标识符,fun是一个函数,用于处理字段。
2、  需要将一个DF或者DS注册为一个临时表
3、  通过spark.sql去运行一个SQL语句,在SQL语句中可以通过name(列名)方式来应用UDF函数

2、用户自定义聚合函数

弱类型用户自定义聚合函数

  • 新建一个Class 继承UserDefinedAggregateFunction  ,然后复写方法:

    override def inputSchema: StructType = ???

  • 你需要通过spark.udf.resigter去注册你的UDAF函数。

  • 需要通过spark.sql去运行你的SQL语句,可以通过 select UDAF(列名) 来应用你的用户自定义聚合函数。

强类型用户自定义聚合函数

1、新建一个class,继承Aggregator[Employee, Average, Double],其中Employee是在应用聚合函数的时候传入的对象,Average是聚合函数在运行的时候内部需要的数据结构,Double是聚合函数最终需要输出的类型。这些可以根据自己的业务需求去调整。复写相对应的方法:

override def zero: Average = ???

2、新建一个UDAF实例,通过DF或者DS的DSL风格语法去应用。

五、Spark SQL和Hive的继承

内置Hive

1、Spark内置有Hive,Spark2.1.1 内置的Hive是1.2.1。
2、需要将core-site.xml和hdfs-site.xml 拷贝到spark的conf目录下。如果Spark路径下发现metastore_db,需要删除【仅第一次启动的时候】。
3、在你第一次启动创建metastore的时候,你需要指定spark.sql.warehouse.dir这个参数,
比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse
4、注意,如果你在load数据的时候,需要将数据放到HDFS上。

外部Hive(这里主要使用这个方法)

1、需要将hive-site.xml 拷贝到spark的conf目录下。
2、如果hive的metestore使用的是mysql数据库,那么需要将mysql的jdbc驱动包放到spark的jars目录下。

3、可以通过spark-sql或者spark-shell来进行sql的查询。完成和hive的连接。

Spark SQL重点知识总结

这就是hive里面的表

Spark SQL重点知识总结

六、Spark SQL的数据源

输入

对于Spark SQL的输入需要使用sparkSession.read方法

1、通用模式   sparkSession.read.format("json").load("path")   支持类型:parquet、json、text、csv、orc、jdbc
2、专业模式   sparkSession.read.json、 csv  直接指定类型。

输出

对于Spark SQL的输出需要使用  sparkSession.write方法

1、通用模式   dataFrame.write.format("json").save("path")  支持类型:parquet、json、text、csv、orc

2、专业模式   dataFrame.write.csv("path")  直接指定类型

3、如果你使用通用模式,spark默认parquet是默认格式、sparkSession.read.load 加载的默认是parquet格式dataFrame.write.save也是默认保存成parquet格式。

4、如果需要保存成一个text文件,那么需要dataFrame里面只有一列(只需要一列即可)。

七、Spark SQL实战

1、数据说明(有需要的可以下方留言)

这里有三个数据集,合起来大概有几十万条数据,是关于货品交易的数据集。

2、任务

这里有三个需求:
1、计算所有订单中每年的销售单数、销售总额
2、计算所有订单每年最大金额订单的销售额
3、计算所有订单中每年最畅销货品

3、步骤

1、加载数据:

tbStock.txt

#代码

Spark SQL重点知识总结

Spark SQL重点知识总结

Spark SQL重点知识总结

Spark SQL重点知识总结

tbStockDetail.txt

case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable

Spark SQL重点知识总结

Spark SQL重点知识总结

Spark SQL重点知识总结

Spark SQL重点知识总结

tbDate.txt

case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable

Spark SQL重点知识总结 Spark SQL重点知识总结 Spark SQL重点知识总结 Spark SQL重点知识总结

2、注册表

tbStockDS.createOrReplaceTempView("tbStock")

Spark SQL重点知识总结

3、解析表

1、计算所有订单中每年的销售单数、销售总额

select c.theyear,count(distinct a.ordernumber),sum(b.amount)

Spark SQL重点知识总结

2、计算所有订单每年最大金额订单的销售额

a、先统计每年每个订单的销售额

select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount

Spark SQL重点知识总结

b、计算最大金额订单的销售额

select d.theyear,c.SumOfAmount as SumOfAmount 

Spark SQL重点知识总结

3、计算所有订单中每年最畅销货品

a、求出每年每个货品的销售额

select c.theyear,b.itemid,sum(b.amount) as SumOfAmount 

Spark SQL重点知识总结

b、在a的基础上,统计每年单个货品的最大金额

select d.theyear,max(d.SumOfAmount) as MaxOfAmount 

Spark SQL重点知识总结

c、用最大销售额和统计好的每个货品的销售额join,以及用年join,集合得到最畅销货品那一行信息

select distinct e.theyear,e.itemid,f.maxofamount 

Spark SQL重点知识总结

Spark SQL重点知识总结

Spark SQL重点知识总结

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这