elasticsearch源码分析index action实现方式-亚博电竞手机版

目录

  • action的作用
  • transportaction的类图
  • operationtransporthandler的代码
  • primary操作的方法
  • 总结

action的作用

上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式。

再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(indexaction)和transport*action(transportindexaction),前者类中会有一个实例(indexaction instance = new indexaction())这个实例用于client绑定对应的transportaction(registeraction(indexaction.instance, transportindexaction.class)),绑定过程发送在actionmoduel中。

另外在action类中还会定义一个action的名字(string name = "indices:data/write/index")这个名字用于transportservice绑定对于的handle,用于处理nettytransport接收到的信息。transportaction的是最终的逻辑处理者,当接收到请求时,会首先判断本节点能否处理,如果能够处理则调用相关的方法处理得到结果返回,否则将通过nettytransport转发该请求到对应的node进行处理。所有的transport的结构都是这种类型。

transportaction的类图

首先看一下transportaction的类图,所的transport*action都继承自于它。

它主要由两个方法execute和doexecute,execute方法有两种实现,第一种实现需要自行添加actionlistener。最终的逻辑都在doexecute方法中,这个方法在各个功能模块中实现。以下是transportindexaction的继承关系:

实现上由于功能划分的原因,transportindexaction直接继承自transpshardreplicationoperationaction,这个抽象类中的方法是所有需要操作shard副本的功能action的父,因此它的实现还包括delete,bulk等功能action。它实现了多个内部类,这些内部类用来辅助完成相关的功能。这里主要说一下operationtransporthandler,replicaoperationtransporthandler及asyncshardoperationaction三个子类。

operationtransporthandler的代码

如下所示:

class operationtransporthandler extends basetransportrequesthandler { //继承自basetransportrequesthanlder ……………… @override public void messagereceived(final request request, final transportchannel channel) throws exception { // no need to have a threaded listener since we just send back a response request.listenerthreaded(false); // if we have a local operation, execute it on a thread since we don't spawn request.operationthreaded(true);       //调用transport的execute方法,通过channel返回结果 execute(request, new actionlistener() { @override public void onresponse(response result) { try { channel.sendresponse(result); } catch (throwable e) { onfailure(e); } } @override public void onfailure(throwable e) { try { channel.sendresponse(e); } catch (throwable e1) { logger.warn("failed to send response for " actionname, e1); } } }); }

看过nettytransport请求发送和处理的同学一定对这个代码不陌生,这就是elasticsearch节点间处理信息的典型模式。当请求通过nettytransport发送到本节点时会根据请求的action名称找到对应的handler,使用对应的handler来处理该请求。这个handler就对应着“indices:data/write/index”,可以看到它调用execute方法来处理。它的注册时在transportshardreplicationoperationaction构造函数中完成的。

知道了operationtransporthandler,replicaoperationtransporthandler就好理解了它的实现方式跟前者完全一样,对应的action名称加了一个“[r]”,它的作用是处理需要在副本上进行的操作,代码如下所示:

class replicaoperationtransporthandler extends basetransportrequesthandler { …………………… @override public void messagereceived(final replicaoperationrequest request, final transportchannel channel) throws exception { try { shardoperationonreplica(request); } catch (throwable t) { failreplicaifneeded(request.shardid.getindex(), request.shardid.id(), t); throw t; } channel.sendresponse(transportresponse.empty.instance); } }

可以看到代码结构非常像,只是调用了副本操作的方法shardoperationonreplica,这个方法在这transportshardreplicationoperationaction中是抽象的,它的实现在各个子类中,例如deleteaction中实现了对于delete请求如何在副本上处理。

分析完这两个handle是不是对于action的处理过程有了一定的眉目了呢?但是这才是冰山一角,这两个handler是用来接收来自其它节点的请求,如果请求的正好是本节点该如何处理呢?这些逻辑都在asyncshardoperationaction类中。首先看一下它的内部结构:

因为transportshardreplicationoperationaction的所有子类都是对索引的修改,会引起数据不一致,因此它的操作流程都是现在primaryshard上操作然后是replicashard上(request.routing() == null) { throw new routingmissingexception(shardrequest.shardid.getindex(), request.type(), request.id()); } }       //调用indexserice执行对应的index操作 indexservice indexservice = indicesservice.indexservicesafe(shardrequest.shardid.getindex()); indexshard indexshard = indexservice.shardsafe(shardrequest.shardid.id()); sourcetoparse sourcetoparse = sourcetoparse.source(sourcetoparse.origin.primary, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); long version; boolean created; try { engine.indexingoperation op; if (request.optype() == indexrequest.optype.index) { engine.index index = indexshard.prepareindex(sourcetoparse, request.version(), request.versiontype(), engine.operation.origin.primary, request.canhaveduplicates()); if (index.parseddoc().mappingsmodified()) { mappingupdatedaction.updatemappingonmaster(shardrequest.shardid.getindex(), index.docmapper(), indexservice.indexuuid()); } indexshard.index(index); version = index.version(); op = index; created = index.created(); } else { engine.create create = indexshard.preparecreate(sourcetoparse, request.version(), request.versiontype(), engine.operation.origin.primary, request.canhaveduplicates(), request.autogeneratedid()); if (create.parseddoc().mappingsmodified()) { mappingupdatedaction.updatemappingonmaster(shardrequest.shardid.getindex(), create.docmapper(), indexservice.indexuuid()); } indexshard.create(create); version = create.version(); op = create; created = true; } if (request.refresh()) { try { xvrfk indexshard.refresh("refresh_flag_index"); } catch (throwable e) { // ignore } } // update the version on the request, so it will be used for the replicas request.version(version); request.versiontype(request.versiontype().versiontypeforreplicationandrecovery()); assert request.versiontype().validateversionforwrites(request.version()); indexresponse response = new indexresponse(shardrequest.shardid.getindex(), request.type(), request.id(), version, created); return new primaryresponse<>(shardrequest.request, response, op); } catch (writefailureexception e) { if (e.getmappingtypetoupdate() != null) { documentmapper docmapper = indexservice.mapperservice().documentmapper(e.getmappingtypetoupdate()); if (docmapper != null) { mappingupdatedaction.updatemappingonmaster(indexservice.index().name(), docmapper, indexservice.indexuuid()); } } throw e.getcause(); } }

上面的代码就是index的执行过程,这一过程涉及到index的底层操作,这里就不展开,只是说明它在action中是如何实现的,后面会有详细说明。接下来看在副本上的操作。副本可能有多个,因此首先调用了performreplicas方法,在这个方法中首先开始监听集群的状态,然后便利所有的副本进行处理,如果是异步则加入一个listener,否则同步执行返回结果。最后调用performreplica,在该方法中调用外部类的抽象方法shardoperationonreplica。 这一过程比较简单,这里就不再贴代码,有兴趣可以参考相关源码。

总结

这里以transportindexaction为例分析了tansportaction的结构层次。它在transportaction直接还有一层那就是transportshardreplicationoperationaction,这个类是actionsupport包中的一个,这个包把所有的子操作方法做了进一步的抽象,抽象出几个大类放到了这里,所有其它子功能很多都继承自这。这个包会在后面有详细分析。

以上就是elasticsearch源码分析index action实现方式的详细内容,更多关于elasticsearch源码分析index action的资料请关注亚博电竞手机版其它相关文章!

展开全文
内容来源于互联网和用户投稿,文章中一旦含有亚博电竞手机版的联系方式务必识别真假,本站仅做信息展示不承担任何相关责任,如有侵权或涉及法律问题请联系亚博电竞手机版删除

最新文章

网站地图