elasticsearch源码分析index action实现方式-亚博电竞手机版
目录
- action的作用
- transportaction的类图
- operationtransporthandler的代码
- primary操作的方法
- 总结
action的作用
上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式。
再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(indexaction)和transport*action(transportindexaction),前者类中会有一个实例(indexaction instance = new indexaction())这个实例用于client绑定对应的transportaction(registeraction(indexaction.instance, transportindexaction.class)),绑定过程发送在actionmoduel中。
另外在action类中还会定义一个action的名字(string name = "indices:data/write/index")这个名字用于transportservice绑定对于的handle,用于处理nettytransport接收到的信息。transportaction的是最终的逻辑处理者,当接收到请求时,会首先判断本节点能否处理,如果能够处理则调用相关的方法处理得到结果返回,否则将通过nettytransport转发该请求到对应的node进行处理。所有的transport的结构都是这种类型。
transportaction的类图
首先看一下transportaction的类图,所的transport*action都继承自于它。
它主要由两个方法execute和doexecute,execute方法有两种实现,第一种实现需要自行添加actionlistener。最终的逻辑都在doexecute方法中,这个方法在各个功能模块中实现。以下是transportindexaction的继承关系:
实现上由于功能划分的原因,transportindexaction直接继承自transpshardreplicationoperationaction,这个抽象类中的方法是所有需要操作shard副本的功能action的父,因此它的实现还包括delete,bulk等功能action。它实现了多个内部类,这些内部类用来辅助完成相关的功能。这里主要说一下operationtransporthandler,replicaoperationtransporthandler及asyncshardoperationaction三个子类。
operationtransporthandler的代码
如下所示:
class operationtransporthandler extends basetransportrequesthandler
看过nettytransport请求发送和处理的同学一定对这个代码不陌生,这就是elasticsearch节点间处理信息的典型模式。当请求通过nettytransport发送到本节点时会根据请求的action名称找到对应的handler,使用对应的handler来处理该请求。这个handler就对应着“indices:data/write/index”,可以看到它调用execute方法来处理。它的注册时在transportshardreplicationoperationaction构造函数中完成的。
知道了operationtransporthandler,replicaoperationtransporthandler就好理解了它的实现方式跟前者完全一样,对应的action名称加了一个“[r]”,它的作用是处理需要在副本上进行的操作,代码如下所示:
class replicaoperationtransporthandler extends basetransportrequesthandler
可以看到代码结构非常像,只是调用了副本操作的方法shardoperationonreplica,这个方法在这transportshardreplicationoperationaction中是抽象的,它的实现在各个子类中,例如deleteaction中实现了对于delete请求如何在副本上处理。
分析完这两个handle是不是对于action的处理过程有了一定的眉目了呢?但是这才是冰山一角,这两个handler是用来接收来自其它节点的请求,如果请求的正好是本节点该如何处理呢?这些逻辑都在asyncshardoperationaction类中。首先看一下它的内部结构:
因为transportshardreplicationoperationaction的所有子类都是对索引的修改,会引起数据不一致,因此它的操作流程都是现在primaryshard上操作然后是replicashard上(request.routing() == null) { throw new routingmissingexception(shardrequest.shardid.getindex(), request.type(), request.id()); } } //调用indexserice执行对应的index操作 indexservice indexservice = indicesservice.indexservicesafe(shardrequest.shardid.getindex()); indexshard indexshard = indexservice.shardsafe(shardrequest.shardid.id()); sourcetoparse sourcetoparse = sourcetoparse.source(sourcetoparse.origin.primary, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); long version; boolean created; try { engine.indexingoperation op; if (request.optype() == indexrequest.optype.index) { engine.index index = indexshard.prepareindex(sourcetoparse, request.version(), request.versiontype(), engine.operation.origin.primary, request.canhaveduplicates()); if (index.parseddoc().mappingsmodified()) { mappingupdatedaction.updatemappingonmaster(shardrequest.shardid.getindex(), index.docmapper(), indexservice.indexuuid()); } indexshard.index(index); version = index.version(); op = index; created = index.created(); } else { engine.create create = indexshard.preparecreate(sourcetoparse, request.version(), request.versiontype(), engine.operation.origin.primary, request.canhaveduplicates(), request.autogeneratedid()); if (create.parseddoc().mappingsmodified()) { mappingupdatedaction.updatemappingonmaster(shardrequest.shardid.getindex(), create.docmapper(), indexservice.indexuuid()); } indexshard.create(create); version = create.version(); op = create; created = true; } if (request.refresh()) { try { xvrfk indexshard.refresh("refresh_flag_index"); } catch (throwable e) { // ignore } } // update the version on the request, so it will be used for the replicas request.version(version); request.versiontype(request.versiontype().versiontypeforreplicationandrecovery()); assert request.versiontype().validateversionforwrites(request.version()); indexresponse response = new indexresponse(shardrequest.shardid.getindex(), request.type(), request.id(), version, created); return new primaryresponse<>(shardrequest.request, response, op); } catch (writefailureexception e) { if (e.getmappingtypetoupdate() != null) { documentmapper docmapper = indexservice.mapperservice().documentmapper(e.getmappingtypetoupdate()); if (docmapper != null) { mappingupdatedaction.updatemappingonmaster(indexservice.index().name(), docmapper, indexservice.indexuuid()); } } throw e.getcause(); } }
上面的代码就是index的执行过程,这一过程涉及到index的底层操作,这里就不展开,只是说明它在action中是如何实现的,后面会有详细说明。接下来看在副本上的操作。副本可能有多个,因此首先调用了performreplicas方法,在这个方法中首先开始监听集群的状态,然后便利所有的副本进行处理,如果是异步则加入一个listener,否则同步执行返回结果。最后调用performreplica,在该方法中调用外部类的抽象方法shardoperationonreplica。 这一过程比较简单,这里就不再贴代码,有兴趣可以参考相关源码。
总结
这里以transportindexaction为例分析了tansportaction的结构层次。它在transportaction直接还有一层那就是transportshardreplicationoperationaction,这个类是actionsupport包中的一个,这个包把所有的子操作方法做了进一步的抽象,抽象出几个大类放到了这里,所有其它子功能很多都继承自这。这个包会在后面有详细分析。
以上就是elasticsearch源码分析index action实现方式的详细内容,更多关于elasticsearch源码分析index action的资料请关注亚博电竞手机版其它相关文章!