1 基本概念
集群: 一个集群有一个或多个节点组织在一起,并将数据组织在一起,提供索引和搜索服务.
节点:一个节点是一个集群中的服务器,提供存储数据,提供搜索服务.
索引:文档的逻辑的集合
分片:一个逻辑索引有若干分片,其中一个分片被设置为主分片.分片为索引的存储位置. 会涉及到分布式问题.
类型:文档的类型
文档:与lucene中的document类似.
如果集群状态发生改变,发生改变状态的节点,先通知其他节点更新状态,最后才更新本地节点
的状态.因此在提交版本信息同步时,都会涉及到这个过程.
2 提交请求初次创建索引
概述:
解析请求,根据请求创建index和mapping,并提交版本信息到集群.然后根据配置创建0,1,2,3
的shard(分片)然后更新版本信息到集群.最后创建shard(分片)4 然后更新版本信息到集群.
最后根据请求更新已有的mapper信息,最后发布到集群中各个节点,然后更新版本信息.
curl -XPOST 'localhost:9200/twitter/tweet/1' -d '{"name":"parker","age":18}'
源码阅读:
HttpServer.internalDispatchRequest (HttpRequest,HttpChannel) ->
(RestController)restController.dispatchRequest(HttpRequest,HttpChannel) ->
RestController.executeHandler(HttpRequest,HttpChannel)->
获取RestHandler->
handler.handleRequest(request, channel)->
BaseRestHandler.handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()))->
RestIndexAction.handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()))->
创建IndexRequest对象.
client.index(indexRequest,RestBuilderListener)->
AbstraceClient.index(final IndexRequest request, final ActionListener
execute(IndexAction.INSTANCE, request, listener)->
BaseRestHandler.execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener
处理前的准备工作copyHeaderSAndContext(request, restRequest, headers)->
<-copyHeaderSAndContext
FilterClient.execute(action, request, listener)->
(TransportAction)transportAction.execute(request, listener)->
TransportAction.doExecute(request, listener)->
TransportIndexAction.doExecute(final IndexRequest request, final ActionListener
判断是否需要新建索引,如果需要则新建,否则直接建立索引.
如果需要创建索引,先构造创建索引的请求.然后创建索引
(TransportCreateIndexAction)createIndexAction.execute(createIndexRequest, new ActionListener
TransportMasterNodeOperationAction.doExecute(final Request request, final ActionListener
TransportMasterNodeOperationAction.innerExecute(final Request request, final ActionListener
先判断是否是本地master节点的请求
如果通过验证,将其交给线程池来处理.
TransportMasterNodeOperationAction.masterOperation(request, clusterService.state(), listener);->
TransportCreateIndexAction.masterOperation((final CreateIndexRequest request, final ClusterState state, final ActionListener
创建请求
(CreateIndexClusterStateUpdateRequest)updateRequest
MetaDataCreateIndexService.createIndexService.createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener
如果能获取锁,直接创建.
否则放入线程池,交由后台执行获取锁操作,什么时候获取到什么时候创建.
MetaDataCreateIndexService.createIndex(request, listener, mdLock);->
创建并完善请求.
(InternalClusterService extends ClusterService)clusterService.submitStateUpdateTask(String, Priority.URGENT, new AckedClusterStateUpdateTask
首先生成UpdateTask,然后被提交的线程池中,然后被执行UpdateTask的run方法.
UpdateTask.run->
newClusterState = updateTask.execute(previousClusterState);->
MetaDataCreateIndexService.execute->
创建相关配置文件.
在indices中创建index 并且增加mapping
indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());->
创建相关索引,添加模块.
<-indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());
初始化indexService
初始化mapperService
初始化索引解析服务indexQueryParserService
更新相关其他相关信息
<-MetaDataCreateIndexService.execute
<-newClusterState = updateTask.execute(previousClusterState);
Discovery.AckListener ackListener = new NoOpAckListener();
如果是主节点,然后发布状态.
更新状态.
for (ClusterStateListener listener : preAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
RoutingService.clusterChanged(Event)->
<-RoutingService.clusterChanged(Event)
IndicesClusterStateService.clusterChanged(Event)->
执行一系列与event相关的操作.
cleanFailedShards(event);
applyDeletedIndices(event);
applyNewIndices(event);
applyMappings(event);
applyAliases(event);
applyNewOrUpdatedShards(event);
applyDeletedShards(event);
applyCleanedIndices(event);
applySettings(event);
<-IndicesClusterStateService.clusterChanged(Event)
NodeSettingsService.clusterChanged(Event)->
<-NodeSettingsService.clusterChanged(Event)
InternalClusterInfoService.clusterChanged(Event)->
<-InternalClusterInfoService.clusterChanged(Event)
RepositoriesService.clusterChanged(Event)->
<-RepositoriesService.clusterChanged(Event)
MetaDataUpdateSettingsService.clusterChanged(Event)->
<-MetaDataUpdateSettingsService.clusterChanged(Event)
RestoreService.clusterChanged(Event)->
<-RestoreService.clusterChanged(Event)
RiverRouter.clusterChanged(Event)->
<-RiverRouter.clusterChanged(Event)
org.elasticsearch.cluster.service.InternalClusterService$LocalNodeMasterListeners.clusterChanged(Event)->
<-org.elasticsearch.cluster.service.InternalClusterService$LocalNodeMasterListeners.clusterChanged(Event)
LocalGatewayAllocator.clusterChanged(Event)->
<-LocalGatewayAllocator.clusterChanged(Event)
IndexCache.clusterChanged(Event)->
<-IndexCache.clusterChanged(Event)
SnapshotsService.clusterChanged(Event)->
<-SnapshotsService.clusterChanged(Event)
IndicesStore.clusterChanged(Event)->
<-IndicesStore.clusterChanged(Event)
LocalGateway.clusterChanged(Event)->
<-LocalGateway.clusterChanged(Event)
GatewayService.clusterChanged(Event)->
<-GatewayService.clusterChanged(Event)
如果node有掉节点的情况,卸下节点.
获取发布相关请求.
//manual ack only from the master at the end of the publish
<-UpdateTask.run
<-(InternalClusterService extends ClusterService)clusterService.submitStateUpdateTask
<-MetaDataCreateIndexService.createIndexService.createIndex
<-TransportCreateIndexAction.masterOperation
<-TransportMasterNodeOperationAction.masterOperation
<-TransportMasterNodeOperationAction.innerExecute
<-TransportMasterNodeOperationAction.doExecute
<-(TransportCreateIndexAction)createIndexAction.execute(createIndexRequest, new ActionListener
否则直接创建索引
TransportCreateIndexAction.innerExecute(request, listener);
<-TransportIndexAction.doExecute(final IndexRequest request, final ActionListener
<-TransportAction.doExecute(request, listener);
<-(TransportAction)transportAction.execute
<-FilterClient.execute
<-BaseRequestHandler.execute
<-AbstraceClient.index
<-RestIndexAction.handleRequest
<-BaseRestHandler.handleRequest
<-RestHandler
<-RestController
<-HttpServer
后期会详细研究这一过程