elasticsearch节点间通信的基础transport启动过程-亚博电竞手机版
目录
- 前言
- transport
- 启动serverbootstrap
- 如何连接到node
- 连接方法的代码
- 总结
前言
在前一篇中我们分析了cluster的一些元素。接下来的章节会对cluster的运作机制做详细分析。本节先分析一些transport,它是cluster间通信的基础。它有两种实现,一种是基于netty实现nettytransport,主要用于节点间的通信。另一种是localtransport,主要是用于同一个jvm上的节点通信。因为是同一个jvm上的网络模拟,localtransport实现上非常简单,实际用处也非常有限,这里就不过多说明。这一篇的重点是nettytransport。
transport
transport顾名思义是集群通信的基本通道,无论是集群状态信息,还是搜索索引请求信息,都是通过transport传送。elasticsearch定义了tansport,tansportmessage,tansportchannel,tansportrequest,tansportresponse等所需的所有的基础接口。这里将以transport为主,分析过程中会附带介绍其它接口。首先看一下transport节点的定义,如下图所示:
nettytransport实现了该接口。分析nettytransport前简单说一下netty的用法,netty的使用需要三个模块serverbootstrap,clientbootstrap(v3.x)及messagehandler。serverbootstrap启动服务器,clientbootstrap启动客户端并连接服务器,messagehandler是message处理逻辑所在,也就是业务逻辑。其它详细使用请参考netty官方文档。
启动serverbootstrap
nettytransport每个在dostart()方法中启动serverbootstrap,和clientbootstrap,并绑定ip,代码如下所示:
protected void dostartwww.cppcns.com() throws elasticsearchexception { clientbootstrap = createclientbootstrap();//根据配置启动客户端 ……//省略了无关分代码 createserverbootstrap(name, mergedsettings);//启动server端 bindserverbootstrap(name, mergedsettings);//绑定ip }
每一个节点都需要发送和接收,因此两者都需要启动,client和server的启动分别在相应的方法中,启动过程就是netty的启动过程,有兴趣可以去看相应方法。bindserverbootstrap(name, mergedsettings)将本地ip和断开绑定到netty同时设定好export host(export host的具体作业我也看明白也没有看到相关的绑定,需要进一步研究)。
启动client及server的过程中将messagehandler注入到channelpipeline中。至此启动过程完成,但是client并未连接任何server,连接过程是在节点启动后,才连接到其它节点的。
如何连接到node
方法代码如下所示:
public void connecttonode(discoverynode node, boolean light) { //transport的模块必须要启动 if (!lifecycle.started()) { throw new elasticsearchillegalstateexception("can't add nodes to a stopped transport"); } //获取读锁,每个节点可以和多个节点建立连接,因此这里用读锁 globallock.readlock().lock(); try { //以node.id为基础获取一个锁,这保证对于每个node只能建立一次连接 connectionlock.acquire(node.id()); try { if (!lifecycle.started()) { throw new elasticsearchillegalstateexception("can't add nodes to a stopped transport"); } nodechannels nodechannels = connectednodes.get(node); if (nodechannels != null) { return; } try { if (light) {//这里的light,就是对该节点只获取一个channel,所有类型(5种连接类型下面会说到)都使用者一个channel nodechannels = connecttochannelslight(node); } else { nodechannels = new nodechannels(new channel[connectionspernoderecovery], new channel[connectionspernodebulk], new channel[connectionspernodereg], new channel[connectionspernodestate], new channel[connectionspernodeping]); try { connecttochannels(nodechannels, node); } catch (throwable e) { logger.trace("failed to connect to [{}], cleaning dangling connections", e, node); nodechannels.close(); throw e; } } // we acquire a connection lock, so no way there is an existing connection connectednodes.put(node, nodechannels); if (logger.isdebugenabled()) { logger.debug("connected to node [{}]", node); } transportserviceadapter.raisenodeconnected(node); } catch (connecttransportexception e) { throw e; } catch (exception e) { throw new connecttransportexception(node, "general node connection failure", e); } } finally { connectionlock.release(node.id()); } } finally { globallock.readlock().unlock(); } }
如果不是轻连接,每个server和clien之间都有5中连接,着5中连接承担着不同的任务
连接方法的代码
protected void connecttochannels(nodechannels nodechannels, discoverynode node) { //五种连接方式,不同的连接方式对应不同的集群操作 channelfuture[] connectrecovery = new channelfuture[nodechannels.recovery.length]; channelfuture[] connectbulk = new channelfuture[nodechannels.bulk.length]; channelfuture[] connectreg = new channelfuture[nodechannels.reg.length]; channelfuture[] connectstate = new channelfuture[nodechannels.state.length]; channelfuture[] connectping = new channelfuture[nodechannels.ping.length]; inetsocketaddress address = ((inetsockettransportaddress) node.address()).address(); //尝试建立连接 for (int i = 0; i < connectrecovery.length; i ) { connectrecovery[i] = clientbootstrap.connect(address); } for (int i = 0; i < connectbulk.length; i ) { connectbulk[i] = clientbootstrap.connect(address); } for (int i = 0; i < connectreg.length; i ) { connectreg[i] = clientbootstrap.connect(address); } for (int i = 0; i < connectstate.length; i ) { connectstate[i] = clientbootstrap.connect(address); } for (int i = 0; i < connectping.length; i ) { connectping[i] = clientbootstrap.connect(address); } //获取每个连接的channel存入到相应的channels中便于后面使用。 try { for (int i = 0; i < connectrecovery.length; i ) { connectrecovery[i].awaituninterruptibly((long) (connecttimeout.millis() * 1.5)); if (!connectrecovery[i].issuccess()) { throw new connecttransportexception(node, "connect_timeout[" connecttimeout "]", connectrecovery[i].getcause()); } nodechannels.recovery[i] = connectrecovery[i].getchannel(); nodechannels.recovery[i].getclosefuture().addlistener(new channelcloselistener(node)); } for (int i = 0; i < connectbulk.length; i ) { connectbulk[i].awaituninterruptiblntgohyy((long) (connecttimeout.millis() * 1.5)); if (!connectbulk[i].issuccess()) { throw new connecttransportexception(node, "connect_timeout[" connecttimeout "]", connectbulk[i].getcause()); } nodechannels.bulk[i] = connectbulk[i].getchannel(); nodechannels.bulk[i].getclosefuture().addlistener(new channelcloselistener(node)); } for (int i = 0; i < connectreg.length; i ) { connectreg[i].awaituninterruptibly((long) (connecttimeout.millis() * 1.5)); if (!connectreg[i].issuccess()) { throw new connecttransportexception(node, "connect_timeout[" connecttimeout "]", connectreg[i].getcause()); } nodechannels.reg[i] = connectreg[i].getchannel(); nodechannels.reg[i].getclosefuture().addlistener(new channelcloselistener(node)); } for (int i = 0; i < connectstate.length; i ) { connectstate[i].awaituninterruptibly((long) (connecttimeout.millis() * 1.5)); if (!connectstate[i].issuccess()) { throw new connecttransportexception(node, "connect_timeout[" connecttimeout "]", connectstate[i].getcause()); } nodechannels.state[i] = connectstate[i].getchannel(); nodechannels.state[i].getclosefuture().addlistener(new channelcloselistener(node)); } for (int i = 0; i < connectping.length; i ) { connectping[i].awaituninterruptibly((long) (connecttimeout.millis() * 1.5)); if (!connectping[i].issuccess()) { throw new connecttransportexception(node, "connect_timeout[" connecttimeout "]", connectping[i].getcause()); } nodechannels.ping[i] = connectping[i].getchannel(); nodechannels.ping[i].getclosefuture().addlistener(new channelcloselistener(node)); } if (nodechannels.recovery.length == 0) { if (nodechannels.bulk.length > 0) { nodechannels.recovery = nodechannels.bulk; } else { nodechannels.recovery = nodechannels.reg; } } if (nodechannels.bulk.length == 0) { nodechannels.bulk = nodechannels.reg; } } catch (runtimeexception e) { // clean the futures for (channelfuture future : immutablelist.
以上就是节点建立连接的过程,每一对client和server间都会建立一定数量的不同连接。之所以要区分连接,是因为不同的操作消耗的资源不同,请求的频率也不同。对于资源消耗少请求频率高的如ping,可以建立多一些连接,来确保并发。对于消耗资源多如bulk操作,则要少建立一些连接,保证机器不被拖垮。节点的断开,这是讲相应的channel释放的过程。这里就不再做详细说明,可以参考相关源码。
总结
nettytransport的连接过程,启动过程分别启动client和server,同时将对于的messagehandler注入,启动多次就是netty的启动过程。然后绑定server ip和断开。但是这里并没有连接,连接发送在节点启动时,节点启动会获取cluster信息,分别对集群中的节点建立上述的5种连接。
这就是nettytransport的启动和连接过程。transport还有一个很重要的功能就是发送request,及如何处理request,这些功能会在下一篇中分析,希望大家以后多多支持我们!