本文分享自天翼云开发者社区《spark-sql优化简述》,作者:徐****东
1、自适应中reduce参数控制 spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。 spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。 spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。
2、合理设置单partition读取数据量 SET spark.sql.files.maxPartitionBytes=xxxx;
3、合理设置shuffle partition的数量 SET spark.sql.shuffle.partitions=xxxx
4、使用coalesce & repartition调整partition数量 SELECT /+ COALESCE(3) */ * FROM EMP_TABLE SELECT /+ REPARTITION(3) / * FROM EMP_TABLE SELECT /+ REPARTITION(c) / * FROM EMP_TABLE SELECT /+ REPARTITION(3, dept_col) / * FROM EMP_TABLE SELECT /+ REPARTITION_BY_RANGE(dept_col) / * FROM EMP_TABLE SELECT /+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE
5、使用broadcast join
6、开启Adaptive Query Execution(Spark 3.0) 6.1、动态合并分区: spark会根据分区的数据量将小数据量的多个分区合并成一个分区,可以提高资源的利用率 spark.sql.adaptive.enabled: 是否开启AQE优化 spark.sql.adaptive.coalescePartitions.enabled: 是否开启动态合并分区 spark.sql.adaptive.coalescePartitions.initialPartitionNum: 初始分区数 spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分区的推荐目标大小 spark.sql.adaptive.coalescePartitions.minPartitionNum: 合并之后的最小分区数
当RDD的分区数处于spark.sql.adaptive.coalescePartitions.initialPartitionNum与spark.sql.adaptive.coalescePartitions.minPartitionNum范围内才会合并 spark.sql.adaptive.advisoryPartitionSizeInBytes: 合并分区之后,分区的数据量的预期大小
6.2、动态切换join策略: 在join的时候,会动态选择性能最高的join策略,提高效率 spark.sql.adaptive.enabled: 是否开启AQE优化 spark.sql.adaptive.localShuffleReader.enabled:在不需要进行shuffle重分区时,尝试使用本地shuffle读取器。将sort-meger join 转换为广播join
6.3、动态申请资源: 当计算过程中资源不足会自动申请资源 spark.sql.adaptive.enabled: 是否开启AQE优化 spark.dynamicAllocation.enabled: 是否开启动态资源申请 spark.dynamicAllocation.shuffleTracking.enabled: 是否开启shuffle状态跟踪
6.4、动态join数据倾斜: join的时候如果出现了数据倾斜,会动态调整分区的数据量,优化数据倾斜导致的性能问题。 spark.sql.adaptive.enabled: 是否开启AQE优化 倾斜的膨胀系数:spark.sql.adaptive.skewJoin.skewedPartitionFactor:N 倾斜的最低阈值:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:M 拆分粒度,以字节为单位:spark.sql.adaptive.advisoryPartitionSizeInBytes G [代表优化之后,分区数数据的预期大小]
sparksql判断出现数据倾斜的依据[需要两个条件同时满足]: 当某个分区处理的数据量>= N * 所有task处理数据量的中位数 当某个分区处理的数据量>= M
7、文件与分区 SET spark.sql.files.maxPartitionBytes=xxx //读取文件的时候一个分区接受多少数据; spark.sql.files.openCostInBytes//文件打开的开销,通俗理解就是小文件合并的阈值
8、CBO优化 spark.sql.cbo.enabled: 是否开启cbo优化 spark.sql.cbo.joinReorder.enabled: 是否调整多表Join的顺序 spark.sql.cbo.joinReorder.dp.threshold: 设置多表jion的表数量的阈值,一旦join的表数量超过该阈值则不优化多表join的顺序
9、hints优化 hints预防主要用在分区和join上。
Partitioning Hints Types:COALESCE,REPARTITION,REPARTITION_BY_RANGE
Join Hints Types:BROADCAST,MERGE,SHUFFLE_HASH,SHUFFLE_REPLICATE_NL
SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
## Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## When different join strategy hints are specified on both sides of a join, Spark
## prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
## over the SHUFFLE_REPLICATE_NL hint.
## Spark will issue Warning in the following example
## org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
## is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
10、缓存表 对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用SQLContext.cacheTable(TableName)或者DataFrame.cache即可,SparkSQL会用内存列存储的格式进行表的缓存,然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存的使用和GC的开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,使用SQLContext.setConf()设置,可以通过 spark.sql.inMemoryColumnarStorage.batchSize 这个参数,默认10000,配置列存储单位。 永久视图 view:永久保存一段查询语句的逻辑,而不是查询语句的数据,永久有效,查询这个视图,相当于查询一个SQL语句,如果保存的查询逻辑复杂,这查询视图也耗时长。支持重新覆盖 create or replace view view1 as 临时视图 temporary view:只在当前会话生效,如果会话结束,则临时视图失效,支持重新覆盖 create or replace temporary view temp_view1 as,类似于 SparkSQL 中的 DataFrame.createOrReplaceTempView('视图名'),hive不支持这个语法 缓存表cache table:只在当前会话有效,将一段查询结果集缓存到内存,并赋予一个表名。 table:永久有效,保存数据结构和数据本身到磁盘。 with as:当子查询的嵌套层数太多时,可以用with as 增加可读性。
11、group by优化 为了提高 group by 查询的性能,可以尝试以下几种方法: 仅选择必要的字段进行 group by 操作,避免选择过多的字段。 尽可能将 group by 字段类型保持一致,以减少数据转换的开销。 如果可能,可以将 group by 字段进行哈希分区,以减少数据传输和处理的开销。 如果使用的是字符串类型,可以考虑使用哈希函数来减少字符串比较的开销。
12、优化倾斜连接 数据偏斜会严重降低联接查询的性能。此功能通过将倾斜的任务拆分(按需复制)为大小大致相等的任务来动态处理排序合并联接中的倾斜。同时启用spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置时,此选项才生效。