Elasticsearch源码剖析之master选举策略

2020-06-03 00:00:00 集群 节点 响应 选举 候选

当es Node刚启动或Node ping master节点超时时,会触发当前节点重新加入集群。本文就是对加入集群这一过程的描述。

es中加入集群在org.elasticsearch.discovery.zen.ZenDiscovery#innerJoinCluster中实现。

innerJoinCluster

private void innerJoinCluster() {
 DiscoveryNode masterNode = null;
 final Thread currentThread = Thread.currentThread();
  nodeJoinController.startElectionContext();
  while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
 masterNode = findMaster();
 }
if (!joinThreadControl.joinThreadActive(currentThread)) {
 logger.trace("thread is no longer in currentJoinThread. Stopping.");
 return;
 }
if (transportService.getLocalNode().equals(masterNode)) {
 final int requiredJoins = Math.max(, electMaster.minimumMasterNodes() - 1); // we count as one
 logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
 nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
 new NodeJoinController.ElectionCallback() {
 @Override
 public void onElectedAsMaster(ClusterState state) {
 synchronized (stateMutex) {
 joinThreadControl.markThreadAsDone(currentThread);
 }
 }
@Override
 public void onFailure(Throwable t) {
 logger.trace("failed while waiting for nodes to join, rejoining", t);
 synchronized (stateMutex) {
 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
 }
 }
 }
);
 } else {
 // process any incoming joins (they will fail because we are not the master)
 nodeJoinController.stopElectionContext(masterNode + " elected");
// send join request
 final boolean success = joinElectedMaster(masterNode);
synchronized (stateMutex) {
 if (success) {
 DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
 if (currentMasterNode == null) {
 // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
 // a valid master.
 logger.debug("no master node is set, despite of join request completing. retrying pings.");
 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
 } else if (currentMasterNode.equals(masterNode) == false) {
 // update cluster state
 joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
 }
joinThreadControl.markThreadAsDone(currentThread);
 } else {
 // failed to join. Try again...
 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
 }
 }
 }
 }

相关文章