Spark Core读取ES的分区问题分析

Stella981
• 阅读 620

ES也是比较火热,在日志数据分析,规则分析等确实很方便,说实话用es stack 浪尖觉得可以解决很多公司的数据分析需求。极客时间下周一要上线新的ES课程,有需要的暂时别购买,到时候还找浪尖返现吧。

写这篇文章的原因是前两天星球球友去面试,面试管问了一下,Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢?

稍微猜测一下就能想到跟分片数有关,但是具体是什么关系呢?

可想的具体关系可能是以下两种:

1).就像KafkaRDD的分区与kafka topic分区数的关系一样,一对一。

2).ES支持游标查询,那么是不是也可以对比较大的分片进行拆分成多个RDD分区呢?

那么下面浪尖带着大家翻一下源码看看具体情况。

1.Spark Core读取ES

ES官网直接提供的有elasticsearch-hadoop 插件,对于ES 7.x,hadoop和Spark版本支持如下:

hadoop2Version = 2.7.1``hadoop22Version = 2.2.0``spark13Version = 1.6.2``spark20Version = 2.3.0

浪尖这了采用的ES版本是7.1.1,测试用的Spark版本是2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:

a,导入整个elasticsearch-hadoop包

<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>7.1.1</version> </dependency>

b,只导入spark模块的包

<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>7.1.1</version> </dependency>

浪尖这里为了测试方便,只是在本机起了一个单节点的ES实例,简单的测试代码如下:

import org.apache.spark.{SparkConf, SparkContext}``import org.elasticsearch.hadoop.cfg.ConfigurationOptions``object es2sparkrdd { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName) conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1") conf.set(ConfigurationOptions.ES_PORT, "9200") conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true") conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true") conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")``// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)``// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd) conf.set("es.write.rest.error.handlers", "ignoreConflict") conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler") val sc = new SparkContext(conf) import org.elasticsearch.spark._ sc.esRDD("posts").foreach(each=>{ each._2.keys.foreach(println) }) sc.esJsonRDD("posts").foreach(each=>{ println(each._2) }) sc.stop() }``}

可以看到Spark Core读取RDD主要有两种形式的API:

a,esRDD。这种返回的是一个tuple2的类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。

RDD[(String, Map[String, AnyRef])]

b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。

RDD[(String, String)]

虽然是两种类型的RDD,但是RDD都是ScalaEsRDD类型。

要分析Spark Core读取ES的并行度,只需要分析ScalaEsRDD的getPartitions函数即可。

2.源码分析

首先导入源码https://github.com/elastic/elasticsearch-hadoop这个是gradle工程,可以直接导入idea,然后切换到7.x版本即可。

废话少说直接找到ScalaEsRDD,发现gePartitions是在其父类实现的,方法内容如下:

override def getPartitions: Array[Partition] = { esPartitions.zipWithIndex.map { case(esPartition, idx) => new EsPartition(id, idx, esPartition) }.toArray }

esPartitions是一个lazy型的变量:

@transient private[spark] lazy val esPartitions = { RestService.findPartitions(esCfg, logger) }

这种声明原因是什么呢?

lazy+transient的原因大家可以考虑一下。

RestService.findPartitions方法也是仅是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法。

final List<PartitionDefinition> partitions;``// 5.x及以后版本 同时没有配置es.input.max.docs.per.partition``if (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) { partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);``} else { partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);``}

a).findSlicePartitions

这个方法其实就是在5.x及以后的ES版本,同时配置了

es.input.max.docs.per.partition

以后,才会执行,实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:

long numDocs;``if (readResource.isTyped()) { numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);``} else { numDocs = client.countIndexShard(index, Integer.toString(shardId), query);``}``int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);``for (int i = 0; i < numPartitions; i++) { PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions); partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));``}

实际上分片就是用游标的方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据的读取,组装过程是SearchRequestBuilder.assemble方法来实现的。

这个其实个人觉得会浪费一定的性能,假如真的要ES结合Spark的话,建议合理设置分片数。

b).findShardPartitions方法

这个方法没啥疑问了就是一个RDD分区对应于ES index的一个分片。

PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,``locationList.toArray(new String[0]));``partitions.add(partition);

3.总结

以上就是Spark Core读取ES数据的时候分片和RDD分区的对应关系分析,默认情况下是一个es 索引分片对应Spark RDD的一个分区。假如分片数过大,且ES版本在5.x及以上,可以配置参数

es.input.max.docs.per.partition

进行拆分。

更多Spark,flink及大数据问题,欢迎加入浪尖知识星球学习~

Spark Core读取ES的分区问题分析

本文分享自微信公众号 - 浪尖聊大数据(bigdatatip)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
JS 苹果手机日期显示NaN问题
问题描述newDate("2019122910:30:00")在IOS下显示为NaN原因分析带的日期IOS下存在兼容问题解决方法字符串替换letdateStr"2019122910:30:00";datedateStr.repl
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Easter79 Easter79
3年前
SpringBoot整合Redis乱码原因及解决方案
问题描述:springboot使用springdataredis存储数据时乱码rediskey/value出现\\xAC\\xED\\x00\\x05t\\x00\\x05问题分析:查看RedisTemplate类!(https://oscimg.oschina.net/oscnet/0a85565fa
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这