elasticsearch集群cluster discovery可配式模块示例分析-亚博电竞手机版

目录

  • 前言
  • discovery模块的概述
  • cluster节点探测
  • masterfaultdetection的启动代码
    • master连接失败的逻辑
  • masterping的关键代码

前言

elasticsearch cluster实现了自己发现机制zen。discovery功能主要包括以下几部分内容:master选举,master错误探测,集群中其它节点探测,单播多播ping。本篇会首先概述以下discovery这一部分的功能,然后介绍节点检测。其它内容会在接下来介绍。

discovery模块的概述

discovery是可配式模块,官方支持亚马逊的azure discovery,google compute engine,ec2 discovery三种发现机制,根据插件规则完全可以自己实现其它的发现机制。整个模块通过实现guice的discoverymodule对外提供模块的注册和启动, 默认使用zen discovery。发现模块对外接口为discoveryservice,它的方法如下所示:

它本质上是discovery的一个代理,所有的功能最终都是由所绑定的discovery所实现的。节点启动时通过discoverymodule获取discoveryservice,然后启动discoveryservice,discoveryservice启动绑定的discovery,整个功能模块就完成了加载和启动。这也是elasticsearch所有模块的实现方式,通过module对外提供绑定和获取,通过service接口对外提供模块的功能,在后面的分析中会经常遇到。

cluster节点探测

接下来分析cluster的一个重要功能就是节点探测。cluster中不能没有master节点,因此集群中所有节点都要周期探测master节点,一旦无法检测到,将会进行master选举。同时作为master,对于节点变动也要时刻关注,因此它需要周期性探测集群中所有节点,确保及时剔除已经宕机的节点。这种相互间的心跳检测就是cluster的faultdetection。下图是faultdetection的继承关系:

有两种实现方式,分别是master探测集群中其它节点和其它节点对master节点的探测。

faultdetection只要一个抽象方法handletransportdisconnect,该方法在内部类fdconnectionlistener中被调用。在elasticsearch中大量使用了listener的异步方式,异步可以极大提升系统性能。它的代码如下所示:

private class fdconnectionlistener implements transportconnectionlistener { @override public void onnodeconnected(discoverynode node) { } @override public void onnodedisconnected(discoverynode node) { handletransportdisconnect(node); } }

faultdetection启动时会注册相应的fdconnetionlistener,当探测到节点丢失,会通过onnodedisconnected方法回调对于的handletransportdisconnect进行处理。

masterfaultdetection的启动代码

privatevoidinnerstart(finaldiscoverynode masternode) { this.masternode = masternode; this.retrycount = 0; this.notifiedmasterfailure.set(false); // 尝试连接master节点 try { transportservice.connecttonode(masternode); } catch (final exception e) { // 连接失败通知masternode失败 notifymasterfailure(masternode, "failed to perform initial connect [" e.getmessage() "]"); return; }     //关闭之前的masterping,重启新的masterping if (masterpinger != null) { masterpinger.stop(); } this.masterpinger = new masterpinger(); // 周期之后启动masterping,这里并没有周期启动masterping,只是设定了延迟时间。 threadpool.schedule(pinginterval, threadpool.names.same, masterpinger); }

代码有有详细注释,就不再过多解释。

master连接失败的逻辑

代码如下:

private void notifymasterfailure(final discoverynode masternode, final string reason) { if (notifiedmasterfailure.compareandset(false, true)) { threadpool.generic().execute(new runnable() { @override public void run() {             //通知所有listener master丢失 for (listener listener : listeners) { listener.onmasterfailure(masternode, reason); } } }); stop("master failure, " reason); } }

在zendiscovery中实现了listener.onmasterfailure接口。会进行master丢失的相关处理,在后面再分析。

masterping的关键代码

private class masterpinger implements runnable { private volatile boolean running = true; public void stop() { this.running = false; } @override public void run() { if (!running) { // return and don't spawn... return; } final discoverynode mastertoping = masternode; final masterpingrequest request = new masterpingrequest(clusterservice.localnode().id(), mastertoping.id(), clustername); final transportrequestoptions options = options().withtype(transportrequestoptions.type.ping).withtimeout(pingretrytimeout); transportservice.sendrequest(mastertoping, master_ping_action_name, request, options, new basetransportresponsehandler() { @override public masterpingresponseresponse newinstance() { return new masterpingresponseresponse(); } @override public void handleresponse(masterpingresponseresponse response) { if (!running) { return; } // reset the counter, we got a good result masterfaultdetection.this.retrycount = 0; // check if the master node did not get switched on us..., if it did, we simply return with no reschedule if (mastertoping.equals(masterfaultdetection.this.masternode())) { // 启动新的ping周期 threadpool.schedule(pinginterval, threadpool.names.same, masterpinger.this); } } @override public void handleexception(transportexception exp) { if (!running) { return; } synchronized (masternodemutex) { // check if the master node did not get switched on us... if (mastertoping.equals(masterfaultdetection.this.masternode())) { if (exp instanceof connecttransportexception || exp.getcause() instanceof connecttransportexception) { handletransportdisconnect(mastertoping); return; } else if (exp.getcause() instanceof nolongermasterexception) { logger.debug("[master] pinging a master {} that is no longer a master", masternode); notifymasterfailure(mastertoping, "no longer master"); return; } else if (exp.getcause() instanceof notmasterexception) { logger.debug("[master] pinging a master {} that is not the master", masternode); notifymasterfailure(mastertoping, "not master"); return; } else if (exp.getcause() instanceof nodedoesnotexistonmasterexception) { logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masternode); notifymasterfailure(mastertoping, "do not exists on master, act as master failure"); return; } int retrycount = masterfaultdetection.this.retrycount; logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masternode, retrycount, pingretrycount); if (retrycount >= pingretrycount) { logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masternode, pingretrycount, pingretrytimeout); // not good, failure notifymasterfailure(mastertoping, "failed to ping, tried [" pingretrycount "] times, each with maximum [" pingretrytimeout "] timeout"); } else { // resend the request, not reschedule, rely on send timeout transportservice.sendrequest(mastertoping, master_ping_action_name, request, options, this); } } } } ); } }

masterping是一个线程,在innerstart的方法中没有设定周期启动masterping,但是masterping需要周期进行,这个秘密就在run 方法中,如果ping成功就会重启一个新的ping。这样既保证了ping线程的唯一性同时也保证了ping的顺 run the pinger on the pool as it will run on later threadpool.schedule(timevalue.timevaluemillis(0), threadpool.names.same, masterpinger); } catch (exception e) {             //连接出现异常,启动master节点丢失通知 logger.trace("[master] [{}] transport disconnected (with verified connect)", masternode); notifymasterfailure(masternode, "transport disconnected (with verified connect)"); } } else {           //不需要重连,通知master丢失。 logger.trace("[master] [{}] transport disconnected", node); notifymasterfailure(node, "transport disconnected"); } } }

这就是masterfaultdetection的整个流程:启动中如果master丢失则通知节点丢失,否则在一定延迟(3s)后启动masterping,masterping线程尝试连接master节点,如果master节点网络失联,尝试再次连接。master节点收到masterpingrequest后首先看一下自己还是不是master,如果不是则抛出异常,否则正常回应。节点如果收到响应是异常则启动master丢失通知,否则此次ping结束。在一定延迟后启动新的masterping线程。

nodefaultdetection的逻辑跟实现上跟masterfualtdetetion相似,区别主要在于ping异常处理上。当某个节点出现异常或者没有响应时,会启动节点丢失机制,只是受到通知后的处理逻辑不通。就不再详细分析,有兴趣可以参考具体代码,希望大家以后多多支持我们!

展开全文
内容来源于互联网和用户投稿,文章中一旦含有亚博电竞手机版的联系方式务必识别真假,本站仅做信息展示不承担任何相关责任,如有侵权或涉及法律问题请联系亚博电竞手机版删除

最新文章

网站地图