elasticsearch的zendiscovery和master选举机制原理分析-亚博电竞手机版

目录

  • 前言
  • join的代码
  • findmaster方法
  • 总结

前言

上一篇通过electmasterservice源码,分析了master选举的原理的大部分内容:master候选节点id排序保证选举一致性及通过设置最小可见候选节点数目避免brain split。节点排序后选举只能保证局部一致性,如果发生节点接收到了错误的集群状态就会选举出错误的master,因此必须有其它措施来保证选举的一致性。这就是上一篇所提到的第二点:被选举的数量达到一定的数目同时自己也选举自己,这个节点才能成为master。这一点体现在zendiscovery中,本篇将结合节点的发现过程进一步介绍master选举机制。

节点启动后首先启动join线程,join线程会寻找cluster的master节点,如果集群之前已经启动,并且运行良好,则试图连接集群的master节点,加入集群。否则(集群正在启动)选举master节点,如果自己被选为master,则向集群中其它节点发送一个集群状态更新的task,如果master是其它节点则试图加入该集群。

join的代码

private void innerjoincluster() { discoverynode masternode = null; final thread currentthread = thread.currentthread();      //一直阻塞直到找到master节点,在集群刚刚启动,或者集群master丢失的情况,这种阻塞能够保证集群一致性 while (masternode == null && jointhreadcontrol.jointhreadactive(currentthread)) { masternode = findmaster(); }       //有可能自己会被选举为master(集群启动,或者加入时正在选举)       if (clusterservice.localnode().equals(masternode)) {       //如果本身是master,则需要向其它所有节点发送集群状态更新 clusterservice.submitstateupdatetask("zen-disco-join (elected_as_master)", priority.immediate, new processedclusterstatenonmasterupdatetask() { @override public clusterstate execute(clusterstate currentstate) {             //选举时错误的,之前的master状态良好,则不更新状态,仍旧使用之前状态。 if (currentstate.nodes().masternode() != null) { return currentstate; } discoverynodes.builder builder = new discoverynodes.builder(currentstate.nodes()).masternodeid(currentstate.nodes().localnode().id()); // update the fact that we are the master... clusterblocks clusterblocks = clusterblocks.builder().blocks(currentstate.blocks()).removeglobalblock(qwnardiscoverysettings.getnomasterblock()).build(); currentstate = clusterstate.builder(currentstate).nodes(builder).blocks(clusterblocks).build(); // eagerly run reroute to remove dead nodes from routing table routingallocation.result result = allocationservice.reroute(currentstate); return clusterstate.builder(currentstate).routingresult(result).build(); } @override public void onfailure(string source, throwable t) { logger.error("unexpected failure during [{}]", t, source); jointhreadcontrol.markthreadasdoneandstartnew(currentthread); } @override public void clusterstatephttp://www.cppcns.comrocessed(string source, clusterstate oldstate, clusterstate newstate) { if (newstate.nodes().localnodemaster()) { // we only starts nodesfd if we are master (it may be that we received a cluster state while pinging) jointhreadcontrol.markthreadasdone(currentthread); nodesfd.updatenodesandping(newstate); // start the nodes fd } else { // if we're not a master it means another node published a cluster state while we were pinging // make sure we go through another pinging round and actively join it jointhreadcontrol.markthreadasdoneandstartnew(currentthread); } sendinitialstateeventifneeded(); long count = clusterjoinscounter.incrementandget(); logger.trace("cluster joins counter set to [{}] (elected as master)", count); } }); } else { // 找到的节点不是我,试图连接该master final boolean success = joinelectedmaster(masternode); // finalize join through the cluster state update thread final discoverynode finalmasternode = masternode; clusterservice.submitstateupdatetask("finalize_join (" masternode ")", new clusterstatenonmasterupdatetask() { @override public clusterstate execute(clusterstate currentstate) throws exception { if (!success) { // failed to join. try again... jointhreadcontrol.markthreadasdoneandstartnew(currentthread); return currentstate; } if (currentstate.getnodes().masternode() == null) { // post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have // a valid master. logger.debug("no master node is set, despite of join request completing. retrying pings."); jointhreadcontrol.markthreadasdoneandstartnew(currentthread); return currentstate; } if (!currentstate.getnodes().masternode().equals(finalmasternode)) { return jointhreadcontrol.stoprunningthreadandrejoin(currentstate, "master_switched_while_finalizing_join"); } // note: we do not have to start master fault detection here because it's set at {@link #handlenewclusterstatefrommaster } // when the first cluster state arrives. jointhreadcontrol.markthreadasdone(currentthread); return currentstate; } @override public void onfailure(string source, @nullable throwable t) { logger.error("unexpected error while trying to finalize cluster join", t); jointhreadcontrol.markthreadasdoneandstartnew(currentthread); } }); } }

以上就是join的过程。zendiscovery在启动时会启动一个join线程,这个线程调用了该方法。同时在节点离开,master丢失等情况下也会重启这一线程仍然运行join方法。

findmaster方法

这个方法体现了master选举的机制。代码如下:

private discoverynode findmaster() {       //ping集群中的节点 zenping.pingresponse[] fullpingresponses = pingservice.pingandwait(pingtimeout); if (fullpingresponses == null) {return null; }// 过滤所得到的ping响应,虑除client节点,单纯的data节点 list pingresponses = lists.newarraylist(); for (zenping.pingresponse pingresponse : fullpingresponses) { discoverynode node = pingresponse.node(); if (masterelectionfilterclientnodes && (node.clientnode() || (!node.masternode() && !node.datanode()))) { // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client) } else if (masterelectionfilterdatanodes && (!node.masternode() && node.datanode())) { // filter out data node that is not also master } else { pingresponses.add(pingresponse); } } final discoverynode localnode = clusterservice.localnode(); list pingmasters = newarraylist();      //获取所有ping响应中的master节点,如果master节点是节点本身则过滤掉。pingmasters列表结果要么为空(本节点是master)要么是同一个节点(出现不同节点则集群出现了问题 不过没关系,后面会进行选举) for (zenping.pingresponse pingresponse : pingresponses) { if (pingresponse.master() != null) { if (!localnode.equals(pingresponse.master())) { pingmasters.add(pingresponse.master()); } } } // nodes discovered during pinging set activenodes = sets.newhashset(); // nodes discovered who has previously been part of the cluster and do not ping for the very first time set joinedonceactivenodes = sets.newhashset(); version minimumpingversion = localnode.version();     for (zenping.pingresponse pingresponse : pingresponses) {      activenodes.add(pingresponse.node());     minimumpingversion = version.smallest(pingresponse.node().version(), minimumpingversion);     if (pingresponse.hasjoinedonce() != null && pingresponse.hasjoinedonce()) {   joinedonceactivenodes.add(pingresponse.node());     }     }

//本节点暂时是master也要加入候选节点进行选举 if (localnode.masternode()) { activenodes.add(localnode); long joinscounter = clusterjoinscounter.get(); if (joinscounter > 0) { logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinscounter); joinedonceactivenodes.add(localnode); } }       //pingmasters为空,则本节点是master节点,     if (pingmasters.isempty()) { if (electmaster.hasenoughmasternodes(activenodes)) {//保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果           本次选举节点仍旧是自己,那么本节点才能成为master。这里就体现了master选举的第二条原则。 discoverynode master = electmaster.electmaster(joinedonceactivenodes); if (master != null) { return master; } return electmaster.electmaster(activenodes); } else { // if we don't have enough master nodes, we bail, because there are not enough master to elect from logger.trace("not enough master nodes [{}]", activenodes); return null; } } else {         //phttp://www.cppcns.comingmasters不为空(pingmasters列表中应该都是同一个节点),本节点没有被选举为master,那就接受之前的选举。 return electmaster.electmaster(pingmasters); } }

上面的重点部分都做了标注,就不再分析。除了findmaster方法,还有一个方法也体现了master选举,那就是handlemastergone。下面是它的部分代码,提交master丢失task部分,

clusterservice.submitstateupdatetask("zen-disco-master_failed (" masternode ")", priority.immediate,newprocessedclusterstatenonmasterupdatetask() {       @override public clusterstate execute(clusterstate currentstate) { //获取到当前集群状态下的所有节点 discoverynodes discoverynodes = discoverynodes.builder(currentstate.nodes()) // make sure the old master node, which has failed, is not part of the nodes we publish .remove(masternode.id()) .masternodeid(null).build();           //rejoin过程仍然是重复findmaster过程           if (rejoin) { return rejoin(clusterstate.builder(currentstate).nodes(discoverynodes).build(), "master left (reason = " reason ")"); }           //无法达到选举数量,进行findmaster过程 if (!electmaster.hasenoughmasternodes(discoverynodes)) { return rejoin(clusterstate.builder(currentstate).nodes(discoverynodes).build(), "not enough master nodes after master left (reason = " reason ")"); }           //在当前集群状态下,如果候选节点数量达到预期数量,那么选举出来的节点一定是同一个节点,因为所有的节点看到的集群states是一致的 final discoverynode electedmaster = electmaster.electmaster(discoverynodes); // elect master final discoverynode localnode = currentstate.nodes().localnode(); .... }

从以上的代码可以看到master选举节点的应用场景,无论是findmaster还是handlemastergone,他们都保证了选举一致性。那就是所选节点数量必须要达到一定的数量,否则不能认为选举成功,进入等待环境。如果当前节点被其它节点选举为master,仍然要进行选举一次以保证选举的一致性。这样在保证了选举数量同时对候选节点排序从而保证选举的一致性。

发现和加入集群是zendiscovery的主要功能,当然它还有一些其它功能,如处理节点离开(handleleaverequest),处理master发送的最小clustersates(handlenewclusterstatefrommaster)等功能。这里就不一一介绍,有兴趣请参考相关源码。

总结

本节结合zendiscovery,分析了master选举的另外一部分内容。同时zendiscovery是节点发现集群功能的集合,它主要功能是发现(选举)出集群的master节点,并试图加入集群。同时如果 本机是master还会处理节点的离开和节点丢失,如果不是master则会处理来自master的节点状态更新。

以上就是elasticsearch的zendiscovery和master选举机制原理分析的详细内容,更多关于elasticsearch的zendiscovery和master选举机制的资料请关注亚博电竞手机版其它相关文章!

展开全文
内容来源于互联网和用户投稿,文章中一旦含有亚博电竞手机版的联系方式务必识别真假,本站仅做信息展示不承担任何相关责任,如有侵权或涉及法律问题请联系亚博电竞手机版删除

最新文章

网站地图