elasticsearch节点的transport请求发送处理分析-亚博电竞手机版
目录
- transport请求的发送和处理过程
- request的发送过程
- request的接受过程
- request和response是如何被处理
- request的处理
- response的处理过程
- 最后总结
transport请求的发送和处理过程
前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程。
cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状态的发布及搜索数据请求等等。为了保证信息传输,elasticsearch定义了一个19字节长度的信息头header_size = 2 4 8 1 4,以'e','s'开头,接着是4字节int信息长度,然后是8字节long型信息id,接着是一个字节的status,最后是4字节int型version。
所有的节点间的信息都是以这19个字节开始。同时elasticsearch对于节点间的所有action都定义 了名字,如对master的周期检测action,internal:discovery/zen/fd/master_ping,每个action对应着相应的messagehandler。接下来会进行详分析。
request的发送过程
代码在nettytransport中如下所示:
public void sendrequest(final discoverynode node, final long requestid, final string action, final transportrequest request, transportrequestoptions options) throws ioexception, transportexception { //参数说明:node发送的目的节点,requestid请求id,action action名称,request请求,options包括以下几种操作 recovery,bulk,reg,state,ping; channel targetchannel = nodechannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇) if (compress) { options.withcompress(true); } byte status = 0; //设置status 包括以下几种status_reqres = 1 << 0; status_error = 1 << 1; status_compress = 1 << 2; status = transportstatus.setrequest(status); releasablebytesstreamoutput bstream = new releasablebytesstreamoutput(bigarrays);//初始写出流 boolean addedreleaselistener = false; try { addedreleaselistener = true; transportserviceadapter.onrequestsent(node, requestid, action, request, options); } finally { if (!addedreleaselistener) { releasables.close(bstream.bytes()); } } }
以上就是request的发送过程,获取目标node的channel封装请求写入信息头,然后发送并使用listener监听,这里transportrequest是一个抽象类,它继承了transportmessage同时实现了streamable接口。cluster中对它的实现非常多,各个功能都有相应的request,这里就不一一列举,后面的代码分析中会时常涉及。
request的接受过程
request发送只是transport的一部分功能,有发送就要有接收,这样transport的功能才完整。接下来就是对接收过程的分析。上一篇中简单介绍过netty的使用,message的处理是通过messagehandler处理,因此nettytransport的信息处理逻辑都在messagechannelhandler的messagereceived()方法中,代码如下所示:
public void messagereceived(channelhandlercontext ctx, messageevent e) throws exception { transports.asserttransportthread(); object m = e.getmessage(); if (!(m instanceof channelbuffer)) {//非buffer之间返回 ctx.sendupstream(e); return; } //解析message头 channelbuffer buffer = (channelbuffer) m; int size = buffer.getint(buffer.readerindex() - 4); transportserviceadapter.received(size 6); // we have additional bytes to read, outside of the header boolean hasmessagebytestoread = (size - (nettyheader.header_size - 6)) != 0; int markedreaderindex = buffer.readerindex(); int expectedindexreader = markedreaderindex size; // netty always copies a buffer, either in nioworker in its read handler, where it copies to a fresh // buffer, or in the cumlation buffer, which is cleaned each time streaminput streamin = channelbufferstreaminputfactory.create(buffer, size); //读取信息头中的几个重要元数据 long requestid = buffer.readlong(); byte status = buffer.readbyte(); version version = version.fromid(buffer.readint()); streaminput wrappedstream; ………… if (transportstatus.isrequest(status)) {//处理请求 string action = handlerequest(ctx.getchannel(), wrappedstream, requestid, version); if (buffer.readerindex() != expectedindexreader) { if (buffer.readerindex() < expectedindexreader) { logger.warn("message not fully read (request) for [{}] and action [{}], resetting", requestid, action); } else { logger.warn("message read past expected size (request) for [{}] and action [{}], resetting", requestid, action); } buffer.readerindex(expectedindexreader); } } else {//处理响应 transportresponsehandler handler = transportserviceadapter.onresponsereceived(requestid); // ignore if its null, the adapter logs it if (handler != null) { if (transportstatus.iserror(status)) { handlerresponseerror(wrappedstream, handler); } else { handleresponse(ctx.getchannel(), wrappedstream, handler); } } else { // if its null, skip those bytes buffer.readerindex(markedreaderindex size); } ………… wrappedstream.close(); }
以上就是信息处理逻辑,这个方法基础自netty的simplechannelupstreamhandler类。作为messagehandler会在client和server启动时加入到handler链中,在信息到达后netty会自动调用handler链依次处理。这是netty的内容,就不详细说明,请参考netty文档。
request和response是如何被处理
request的处理
代码如下所示:
protected string handlerequest(channel channel, streaminput buffer, long requestid, version version) throws ioexception { final string action = buffer.readstring();//读出action的名字 transportserviceadapter.onrequestreceived(requestid, action); final nettytransportchannel transportchannel = new nettytransportchannel(transport, transportserviceadapter, action, channel, requestid, version, profilename); try { final transportrequesthandler handler = transportserviceadapter.handler(action, version);//获取处理该信息的handler if (handler == null) { throw new actionnotfoundtransportexception(action); } final transportrequest request = handler.newinstance(); request.remoteaddress(new inetsockettransportaddress((inetsocketaddress) channel.getremoteaddress())); request.readfrom(buffer); if (handler.executor() == threadpool.names.same) { //noinspection unchecked handler.messagereceived(request, transportchannel);//使用该handler处理信息。 } else { threadpool.executor(handler.executor()).execute(new requesthandler(handler, request, transportchannel, action)); } } catch (throwable e) { try { transportchannel.sendresponse(e); } catch (ioexception e1) { logger.warn("failed to send error message back to client for action [" action "]", e); logger.warn("actual exception", e1); } } return action; }
几个关键部分在代码中进行了标注。这里仍旧不能看到请求是如何处理的。因为cluster中的请求各种各样,如ping,discovery,index等等,因此不可能使用同一种处理方式。因此request最终又被提交给handler处理。每个功能请求都实现了自己的handler,当请求被提交给handler时会做对应的处理。这里再说一下transportserviceadapter,消息的处理都是通过它适配转发完成。request的完整处理流程是:messagereceived()方法收到信息判断是request会将其转发到transportserviceadapter的handler方法,handler方法查找对应的requesthandler,使用将信息转发给该handler进行处理。这里就不举例说明,在后面的discover分析中我们会看到发现,ping等请求的处理过程。
response的处理过程
response通过handleresponse方法进行处理,代码如下:
protected void handleresponse(channel channel, streaminput buffer, final transportresponsehandler handler) { final transportresponse response = handler.newinstance(); response.remoteaddress(new inetsockettransportaddress((inetsocketaddress) channel.getremoteaddress())); response.remoteaddress(); try { response.readfrom(buffer); } catch (throwable e) { handleexception(handler, new transportserializationexception("failed to deserialize response of type [" response.getclass().getname() "]", e)); return; } try { if (handler.executor() == threadpool.names.same) { //noinspection unchecked handler.handleresponse(response);//转发给对应的handler } else { threadpool.executor(handler.executor()).execute(new responsehandler(handler, response)); } } catch (throwable e) { handleexception(handler, new responsehandlerfailuretransportexception(e)); } }
response的处理过程跟request很类似。每个request都会对应一个handler和一个response的处理handler,会在时候的时候注册到transportservice中。请求到达时根据action名称获取到handler处理request,根据requestid获取对应的response handler进行响应。
最后总结
nettytransport的信息处理过程:信息通过request方法发送到目标节点,目标节点的messagehandler会受到该信息,确定是request还是response,将他们分别转发给transportserviceadapter,transportserviceadapter会查询到对应的handler,信息最终会被转发给对应的handler处理并反馈。
对于nettytransport信息发送的分析就到这里,在下一篇的cluster discovery分析中,我们会看到信息发送及处理的具体过程,希望大家以后多多支持我们!