elasticsearch集群发现zendiscovery的ping机制分析-亚博电竞手机版
目录
- zendiscovery实现机制
- 广播的过程
- nodeping处理代码
- ping请求的发送策略
- 总结
zendiscovery实现机制
ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群。zendiscovery实现了两种ping机制:广播与单播。本篇将详细分析一些这multicastzenping机制的实现为后面的集群发现和master选举做好铺垫。
广播的过程
首先看一下广播(multicastzenping),广播的原理很简单,节点启动后向网络发送广播信息,任何收到的节点只要集群名字相同都应该对此广播信息作出回应。这样该节点就获取了集群的相关信息。它定义了一个action:"internal:discovery/zen/multicast"和广播的信息头:internal_header 。之前说过nettytransport是cluster通信的基础,但是广播却没有使它。它使用了java的multicastsocket。这里简单的介绍一下multicastsocket的使用。它是一个udp 机制的socket,用来进行多个数据包的广播。它可以帮到一个ip形成一个group,任何multicastsocket都可以join进来,组内的socket发送的信息会被订阅了改组的所有机器接收到。elasticsearch对其进行了封装形成了multicastchannel,有兴趣可以参考相关源码。
首先看一下multicastzenping的几个辅助内部类:
它总共定义了4个内部类,这些内部类和它一起完成广播功能。finalizingpingcollection是一pingresponse的容器,所有的响应都用它来存储。multicastpingresponserequesthandler它是response处理类,类似于之前所说的nettytransporthandler,它虽然使用的不是netty,但是它也定义了一个messagereceived的方法,当收到请求时直接返回一个response。
multicastpingrespons恰卡编程网e就不用细说了,它就是一个响应类。最后要着重说一下receiver类,因为广播并不是使用nettytransport,因此对于消息处理逻辑都在receiver中。在初始化multicastzenping时会将receiver注册进去。
protected void dostart() throws elasticsearchexception { try { .... multicastchannel = multicastchannel.getchannel(nodename(), shared, new multicastchannel.config(port, group, buffersize, ttl, networkservice.resolvepublishhostaddress(address)), new receiver());//将receiver注册到channel中 } catch (throwable t) { .... } }
receiver类基础了listener,实现了3个方法,消息经过onmessage方法区分,如果是内部ping则使用handlenodepingrequest方法处理,否则使用handleexternalpingrequest处理,区分方法很简单,就是读取信息都看它是否符合所定义的internal_header 信息头。
nodeping处理代码
private void handlenodepingrequest(int id, discoverynode requestingnodex, clustername requestclustername) { .... final discoverynodes discoverynodes = contextprovider.nodes(); final discoverynode requestingnode = requestingnodex; if (requestingnode.id().equals(discoverynodes.localnodeid())) { // 自身发出的ping,忽略 return; } //只接受本集群ping if (!requestclustername.equals(clustername)) { ...return; } // 两个client间不需要ping if (!discoverynodes.localnode().shouldconnectto(requestingnode)) {return; } //新建一个response final multicastpingresponse multicastpingresponse = new multicastpingresponse(); multicastpingresponse.id = id; multicastpingresponse.pingresponse = new pingresponse(discoverynodes.localnode(), discoverynodes.masternode(), clustername, contextprovider.nodehasjoinedclusteronce()); //无法连接的情况 if (!transportservice.nodeconnected(requestingnode)) { // do the connect and send on a thread pool threadpool.generic().execute(new runnable() { @override public void run() { // connect to the node if possible try { transportservice.connecttonode(requestingnode); transportservice.sendrequest(requestingnode, action_name, multicastpingresponse, new emptytransportresponsehandler(threadpool.names.same) { @override public void handleexception(transportexception exp) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingnode); } }); } catch (exception e) { if (lifecycle.started()) { logger.warn("failed to connect to requesting node {}", e, requestingnode); } } } }); } else { transportservice.sendrequest(requestingnode, action_name, multicastpingresponse, new emptytransportresponsehandler(threadpool.names.same) { @override public void handleexception(transportexception exp) { if (lifecycle.started()) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingnode); } } }); } } }
另外的一个方法是处理外部ping信息,处理过程是返回cluster的信息(这种外部ping的具体作用没有研究不是太清楚)。以上是响应multicastzenping的过程,收到其它节点的响应信息后它会把本节点及集群的master节点相关信息返回给广播节点。这样广播节点就获知了集群的相关信息。在multicastzenping类中还有一个类multicastpingresponserequesthandler,它的作用是广播节点对其它节点对广播信息响应的回应,广播节点的第二次发送信息的过程。它跟其它transportrequesthandler一样它有messagereceived方法,在启动时注册到transportserver中,只处理一类action:"internal:discovery/zen/multicast"。
ping请求的发送策略
代码如下:
public void ping(final pinglistener listener, final timevalue timeout) { .... //产生一个id final int id = pingidgenerator.incrementandget(); try { receivedresponses.put(id, new pingcollection()); sendpingrequest(id);//第一次发送ping请求 // 等待时间的1/2后再次发送一个请求 threadpool.schedule(timevalue.timevaluemillis(timeout.millis() / 2), threadpool.names.generic, new abstractrunnable() { @override public void onfailure(throwable t) { logger.warn("[{}] failed to send second ping request", t, id); finalizepingcycle(id, listener); } @override public void dorun() { sendpingrequest(id); //再过1/2时间再次发送一个请求 threadpool.schedule(timevalue.timevaluemillis(timeout.millis() / 2), threadpool.names.generic, new abstractrunnable() { @override public void onfailure(throwable t) { logger.warn("[{}] failed to send third ping request", t, id); finalizepingcycle(id, listener); } @override public void dorun() { // make one last ping, but finalize as soon as all nodes have responded or a timeout has past pingcollection collection = receivedresponses.get(id); finalizingpingcollection finalizingpingcollection = new finalizingpingcollection(id, collection, collection.size(), listener); receivedresponses.put(id, finalizingpingcollection); logger.trace("[{}] sending last pings", id); sendpingrequest(id); //最后一次发送请求,超时的1/4后 threadpool.schedule(timevalue.timevaluemillis(timeout.millis() / 4), threadpool.names.generic, new abstractrunnable() { @override public void onfailure(throwable t) { logger.warn("[{}] failed to finalize ping", t, id); } @override protected void dorun() throws exception { finalizepingcycle(id, listener); } }); } }); } }); } catch (exception e) { logger.warn("failed to ping", e); finalizepingcycle(id, listener); } }
发送过程主要是调用sendpingrequest(id)方法,在该方法中会将id,信息头,版本,本地节点信息一起写入到bytesstreamoutput中然后将其进行广播,这个广播信息会被其它机器上的receiver接收并处理,并且响应该ping请求。另外一个需要关注的是以上加说明的部分,它通过链时的定期发送请求,在等待时间内可能会发出4次请求,这种发送方式会造成大量的ping请求重复,幸好ping的资源消耗小,但是好处是可以尽可能保证在timeout这个时间段内集群的新增节点都能收到这个ping信息。在单播中也采用了该策略。
总结
广播的过程:广播使用的是jdk的multicastsocket,在timeout时间内4次发生ping请求,ping请求包括一个id,信息头,本地节点的一些信息;这些信息在其它节点中被接收到交给receiver处理,receiver会将集群的master和本机的相关信息通过transport返回给广播节点。广播节点收到这些信息后会理解使用transport返回一个空的response。至此一个广播过程完成。
在节点分布在多个网段时,广播就失效了,因为广播信息不可达。这个时间就需要使用单播去ping指定的节点获取cluster的相关信息。这就是单播的用处。单播使用的是nettytransport,它会使用跟广播一样的链式请求向指定的节点发送请求。信息的处理方式是之前所介绍的nettytransport标准的信息处理过程。
以上就是elasticsearch集群发现zendiscovery的ping机制分析的详细内容,更多关于elasticsearompeifeisch集群发现zendiscovery ping的资料请关注亚博电竞手机版其它相关文章!