ES 源代码阅读(二)

Wesley13
• 阅读 585

1 基本概念

集群: 一个集群有一个或多个节点组织在一起,并将数据组织在一起,提供索引和搜索服务.
节点:一个节点是一个集群中的服务器,提供存储数据,提供搜索服务.
索引:文档的逻辑的集合
分片:一个逻辑索引有若干分片,其中一个分片被设置为主分片.分片为索引的存储位置. 会涉及到分布式问题.
类型:文档的类型
文档:与lucene中的document类似.

如果集群状态发生改变,发生改变状态的节点,先通知其他节点更新状态,最后才更新本地节点
的状态.因此在提交版本信息同步时,都会涉及到这个过程.

2 提交请求初次创建索引

概述:

解析请求,根据请求创建index和mapping,并提交版本信息到集群.然后根据配置创建0,1,2,3
的shard(分片)然后更新版本信息到集群.最后创建shard(分片)4 然后更新版本信息到集群.
最后根据请求更新已有的mapper信息,最后发布到集群中各个节点,然后更新版本信息.

curl -XPOST 'localhost:9200/twitter/tweet/1' -d '{"name":"parker","age":18}'
源码阅读:
HttpServer.internalDispatchRequest (HttpRequest,HttpChannel) ->
(RestController)restController.dispatchRequest(HttpRequest,HttpChannel) ->
RestController.executeHandler(HttpRequest,HttpChannel)->
获取RestHandler->
handler.handleRequest(request, channel)->
BaseRestHandler.handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()))->
RestIndexAction.handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()))->
创建IndexRequest对象.
client.index(indexRequest,RestBuilderListener)->
AbstraceClient.index(final IndexRequest request, final ActionListener listener)->
execute(IndexAction.INSTANCE, request, listener)->
BaseRestHandler.execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener listener)
处理前的准备工作copyHeaderSAndContext(request, restRequest, headers)->
<-copyHeaderSAndContext
FilterClient.execute(action, request, listener)->
(TransportAction)transportAction.execute(request, listener)->
TransportAction.doExecute(request, listener)->
TransportIndexAction.doExecute(final IndexRequest request, final ActionListener listener)->
判断是否需要新建索引,如果需要则新建,否则直接建立索引.
如果需要创建索引,先构造创建索引的请求.然后创建索引
(TransportCreateIndexAction)createIndexAction.execute(createIndexRequest, new ActionListener)->
TransportMasterNodeOperationAction.doExecute(final Request request, final ActionListener listener)->
TransportMasterNodeOperationAction.innerExecute(final Request request, final ActionListener listener, final ClusterStateObserver observer, final boolean retrying)->
先判断是否是本地master节点的请求
如果通过验证,将其交给线程池来处理.
TransportMasterNodeOperationAction.masterOperation(request, clusterService.state(), listener);->
TransportCreateIndexAction.masterOperation((final CreateIndexRequest request, final ClusterState state, final ActionListener listener)->
创建请求
(CreateIndexClusterStateUpdateRequest)updateRequest
MetaDataCreateIndexService.createIndexService.createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener)->
如果能获取锁,直接创建.
否则放入线程池,交由后台执行获取锁操作,什么时候获取到什么时候创建.
MetaDataCreateIndexService.createIndex(request, listener, mdLock);->
创建并完善请求.
(InternalClusterService extends ClusterService)clusterService.submitStateUpdateTask(String, Priority.URGENT, new AckedClusterStateUpdateTask(request, listener))->
首先生成UpdateTask,然后被提交的线程池中,然后被执行UpdateTask的run方法.
UpdateTask.run->
newClusterState = updateTask.execute(previousClusterState);->
MetaDataCreateIndexService.execute->
创建相关配置文件.
在indices中创建index 并且增加mapping
indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());->
创建相关索引,添加模块.
<-indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());
初始化indexService
初始化mapperService
初始化索引解析服务indexQueryParserService
更新相关其他相关信息
<-MetaDataCreateIndexService.execute
<-newClusterState = updateTask.execute(previousClusterState);

Discovery.AckListener ackListener = new NoOpAckListener();
如果是主节点,然后发布状态.
更新状态.
                for (ClusterStateListener listener : preAppliedListeners) {
                    try {
                        listener.clusterChanged(clusterChangedEvent);
                    } catch (Exception ex) {
                        logger.warn("failed to notify ClusterStateListener", ex);
                    }
                }
RoutingService.clusterChanged(Event)->
<-RoutingService.clusterChanged(Event)
IndicesClusterStateService.clusterChanged(Event)->
执行一系列与event相关的操作.
cleanFailedShards(event);
applyDeletedIndices(event);
applyNewIndices(event);
applyMappings(event);
applyAliases(event);
applyNewOrUpdatedShards(event);
applyDeletedShards(event);
applyCleanedIndices(event);
applySettings(event);
<-IndicesClusterStateService.clusterChanged(Event)
NodeSettingsService.clusterChanged(Event)->
<-NodeSettingsService.clusterChanged(Event)
InternalClusterInfoService.clusterChanged(Event)->
<-InternalClusterInfoService.clusterChanged(Event)
RepositoriesService.clusterChanged(Event)->
<-RepositoriesService.clusterChanged(Event)
MetaDataUpdateSettingsService.clusterChanged(Event)->
<-MetaDataUpdateSettingsService.clusterChanged(Event)
RestoreService.clusterChanged(Event)->
<-RestoreService.clusterChanged(Event)
RiverRouter.clusterChanged(Event)->
<-RiverRouter.clusterChanged(Event)
org.elasticsearch.cluster.service.InternalClusterService$LocalNodeMasterListeners.clusterChanged(Event)->
<-org.elasticsearch.cluster.service.InternalClusterService$LocalNodeMasterListeners.clusterChanged(Event)
LocalGatewayAllocator.clusterChanged(Event)->
<-LocalGatewayAllocator.clusterChanged(Event)
IndexCache.clusterChanged(Event)->
<-IndexCache.clusterChanged(Event)
SnapshotsService.clusterChanged(Event)->
<-SnapshotsService.clusterChanged(Event)
IndicesStore.clusterChanged(Event)->
<-IndicesStore.clusterChanged(Event)
LocalGateway.clusterChanged(Event)->
<-LocalGateway.clusterChanged(Event)
GatewayService.clusterChanged(Event)->
<-GatewayService.clusterChanged(Event)
如果node有掉节点的情况,卸下节点.
获取发布相关请求.
//manual ack only from the master at the end of the publish
<-UpdateTask.run
<-(InternalClusterService extends ClusterService)clusterService.submitStateUpdateTask
<-MetaDataCreateIndexService.createIndexService.createIndex
<-TransportCreateIndexAction.masterOperation
<-TransportMasterNodeOperationAction.masterOperation
<-TransportMasterNodeOperationAction.innerExecute
<-TransportMasterNodeOperationAction.doExecute
<-(TransportCreateIndexAction)createIndexAction.execute(createIndexRequest, new ActionListener)
否则直接创建索引
TransportCreateIndexAction.innerExecute(request, listener);
<-TransportIndexAction.doExecute(final IndexRequest request, final ActionListener listener)
<-TransportAction.doExecute(request, listener);
<-(TransportAction)transportAction.execute
<-FilterClient.execute
<-BaseRequestHandler.execute
<-AbstraceClient.index
<-RestIndexAction.handleRequest
<-BaseRestHandler.handleRequest
<-RestHandler
<-RestController
<-HttpServer

后期会详细研究这一过程

点赞
收藏
评论区
推荐文章
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
ELK初探
EKL核心组成1.ElasticSearch开源分布式搜索引擎,他的特点是分布式、零配置、自动发现、索引自动分片,索引副本机制,restful接口,多数据源,自动搜索负载。安装ElasticSearch  高可用,易扩展,支持集群(cluster),分片和复制(sharding和replicas)验证启动:curlXGETht
Wesley13 Wesley13
3年前
ELK
一、基本概念1Node与ClusterElastic本质上是一个分布式数据库,允许多台服务器协同工作,每台服务器可以运行多个Elastic实例。单个Elastic实例称为一个节点(node)。一组节点构成一个集群(cluster)。2IndexElastic会索引所有字段,经过处理后写入一个反向索引
Stella981 Stella981
3年前
Elasticsearch (1)
创建索引库ES的索引库是一个逻辑概念,它包括了分词列表及文档列表,同一个索引库中存储了相同类型的文档。它就相当于MySQL中的表,或相当于Mongodb中的集合。关于索引这个语:索引(名词):ES是基于Lucene构建的一个搜索服务,它要从索引库搜索符合条件索引数据。索引(动词):索引库刚创建起来是空的,将数据添加到索引库的过程称为索
Stella981 Stella981
3年前
Mongodb集群节点故障恢复场景分析(转)
一个适当配置的Mongodb分片集群是没有单点故障。本文描述了分片集群中存在的几种不同的潜在的节点故障场景,以及Mongodb对这些节点故障是怎么处理的。1、Mongos节点宕机一个Mongos进程应该运行在每一个应用程序服务器上,这个服务器应该独占这个Mongos进程,并且通过它与分片集群来通讯。Mongos进程不是持久化的,相反,
Stella981 Stella981
3年前
ElasticSearch学习笔记(二)
了解以下几个概念1\.索引index简单的可以理解为关系型数据库的中库或者表。一个elasticsearch集群中可以有多个索引。2\.文档docment可以理解为表中的行数据,表示一个对象。每个文档有一个唯一标识\_id,相当于关系数据库的主键。一个索引中可以有多个结构相同的文档。3\.域
Stella981 Stella981
3年前
Elasticsearch如何是如何实现分布式增删改查,一文搞懂
分布式文档存储路由文档到分片主分片和复制分片如何交互新建、索引和删除文档检索文档局部更新文档多文档模式福利路由文档到分片当你索引一个文档,它被存储在单独一个主分片上。Elasticsearch是如何知道文档属于哪个分片的呢?当你创建一个新文档,它是如何知道
Stella981 Stella981
3年前
Elasticsearch文档读写模型实现原理
ES系列基于ElasticSearch6.4.x版本。1、简介ElasticSearch的存储设计天生就是分布式的。每个索引被分成多个分片(默认每个索引含5个主分片(primaryshard)),每个主分片又可以有多个副本。当一个文档被添加或删除时(主分片中新增或删除),其对应的复制分片之间必须保持同步。如果我们不这样做,那么对于同一个文档的检索请
Stella981 Stella981
3年前
ElasticSearch底层原理浅析
基本概念索引(Index)ES将数据存储于一个或多个索引中,索引是具有类似特性的文档的集合。类比传统的关系型数据库领域来说,索引相当于SQL中的一个数据库,或者一个数据存储方案(schema)。索引由其名称(必须为全小写字符)进行标识,并通过引用此名称完成文档的创建、搜索、更新及删除操作。一个ES集群中可以按需创建任意数目的
京东云开发者 京东云开发者
11个月前
ElasticSearch集群灾难:别放弃,也许能再抢救一下 | 京东云技术团队
1前言Elasticsearch作为一个分布式搜索引擎,自身是高可用的;但也架不住一些特殊情况的发生,如:集群超过半数的master节点丢失,ES的节点无法形成一个集群,进而导致集群不可用;索引shard的文件损坏,分片无法被正常恢复,进而导致索引无法正常