聊聊elasticsearch的MembershipAction

本文主要研究一下elasticsearch的MembershipAction

MembershipAction

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

public class MembershipAction {

private static final Logger logger = LogManager.getLogger(MembershipAction.class);

public static final String DISCOVERY_JOIN_ACTION_NAME = "internal:discovery/zen/join";

public static final String DISCOVERY_JOIN_VALIDATE_ACTION_NAME = "internal:discovery/zen/join/validate";

public static final String DISCOVERY_LEAVE_ACTION_NAME = "internal:discovery/zen/leave";

//......

private final TransportService transportService;

private final MembershipListener listener;

public MembershipAction(TransportService transportService, MembershipListener listener,

Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) {

this.transportService = transportService;

this.listener = listener;

transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,

ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());

transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,

() -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,

new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));

transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,

ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());

}

public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {

transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode),

EmptyTransportResponseHandler.INSTANCE_SAME);

}

public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {

transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node),

EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);

}

public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {

transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node),

EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);

}

/**

* Validates the join request, throwing a failure if it failed.

*/

public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) {

transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state),

EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);

}

//......

}

MembershipAction定义三类请求,分别是LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的TransportRequestHandler,分别是LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler

TransportRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequest.java

public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest {

public static class Empty extends TransportRequest {

public static final Empty INSTANCE = new Empty();

}

/**

* Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent".

*/

private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;

public TransportRequest() {

}

public TransportRequest(StreamInput in) throws IOException {

parentTaskId = TaskId.readFromStream(in);

}

/**

* Set a reference to task that created this request.

*/

@Override

public void setParentTask(TaskId taskId) {

this.parentTaskId = taskId;

}

/**

* Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".

*/

@Override

public TaskId getParentTask() {

return parentTaskId;

}

@Override

public void readFrom(StreamInput in) throws IOException {

super.readFrom(in);

parentTaskId = TaskId.readFromStream(in);

}

@Override

public void writeTo(StreamOutput out) throws IOException {

super.writeTo(out);

parentTaskId.writeTo(out);

}

}

TransportRequest继承了TransportMessage类,同时声明实现TaskAwareRequest接口

LeaveRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

public static class LeaveRequest extends TransportRequest {

private DiscoveryNode node;

public LeaveRequest() {

}

private LeaveRequest(DiscoveryNode node) {

this.node = node;

}

@Override

public void readFrom(StreamInput in) throws IOException {

super.readFrom(in);

node = new DiscoveryNode(in);

}

@Override

public void writeTo(StreamOutput out) throws IOException {

super.writeTo(out);

node.writeTo(out);

}

}

LeaveRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入DiscoveryNode

JoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

public static class JoinRequest extends TransportRequest {

private DiscoveryNode node;

public DiscoveryNode getNode() {

return node;

}

public JoinRequest() {

}

private JoinRequest(DiscoveryNode node) {

this.node = node;

}

@Override

public void readFrom(StreamInput in) throws IOException {

super.readFrom(in);

node = new DiscoveryNode(in);

}

@Override

public void writeTo(StreamOutput out) throws IOException {

super.writeTo(out);

node.writeTo(out);

}

}

JoinRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入DiscoveryNode

ValidateJoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

static class ValidateJoinRequest extends TransportRequest {

private ClusterState state;

ValidateJoinRequest() {}

ValidateJoinRequest(ClusterState state) {

this.state = state;

}

@Override

public void readFrom(StreamInput in) throws IOException {

super.readFrom(in);

this.state = ClusterState.readFrom(in, null);

}

@Override

public void writeTo(StreamOutput out) throws IOException {

super.writeTo(out);

this.state.writeTo(out);

}

}

ValidateJoinRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入ClusterState

TransportRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java

public interface TransportRequestHandler<T extends TransportRequest> {

/**

* Override this method if access to the Task parameter is needed

*/

default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception {

messageReceived(request, channel);

}

void messageReceived(T request, TransportChannel channel) throws Exception;

}

TransportRequestHandler接口定义了messageReceived方法,同时还提供了一个messageReceived的default方法

LeaveRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

private class LeaveRequestRequestHandler implements TransportRequestHandler<LeaveRequest> {

@Override

public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception {

listener.onLeave(request.node);

channel.sendResponse(TransportResponse.Empty.INSTANCE);

}

}

LeaveRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onLeave方法

JoinRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

private class JoinRequestRequestHandler implements TransportRequestHandler<JoinRequest> {

@Override

public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {

listener.onJoin(request.getNode(), new JoinCallback() {

@Override

public void onSuccess() {

try {

channel.sendResponse(TransportResponse.Empty.INSTANCE);

} catch (Exception e) {

onFailure(e);

}

}

@Override

public void onFailure(Exception e) {

try {

channel.sendResponse(e);

} catch (Exception inner) {

inner.addSuppressed(e);

logger.warn("failed to send back failure on join request", inner);

}

}

});

}

}

JoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onJoin方法

ValidateJoinRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {

private final Supplier<DiscoveryNode> localNodeSupplier;

private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators;

ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier,

Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {

this.localNodeSupplier = localNodeSupplier;

this.joinValidators = joinValidators;

}

@Override

public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {

DiscoveryNode node = localNodeSupplier.get();

assert node != null : "local node is null";

joinValidators.stream().forEach(action -> action.accept(node, request.state));

channel.sendResponse(TransportResponse.Empty.INSTANCE);

}

}

ValidateJoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用joinValidators挨个进行校验

MembershipAction.MembershipListener

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

public interface MembershipListener {

void onJoin(DiscoveryNode node, JoinCallback callback);

void onLeave(DiscoveryNode node);

}

public interface JoinCallback {

void onSuccess();

void onFailure(Exception e);

}

MembershipListener接口定义了onJoin及onLeave方法,其中onJoin方法接收JoinCallback;它有一个同名实现类,在ZenDiscovery类中

ZenDiscovery.MembershipListener

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

//......

private class MembershipListener implements MembershipAction.MembershipListener {

@Override

public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {

handleJoinRequest(node, ZenDiscovery.this.clusterState(), callback);

}

@Override

public void onLeave(DiscoveryNode node) {

handleLeaveRequest(node);

}

}

void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) {

if (nodeJoinController == null) {

throw new IllegalStateException("discovery module is not yet started");

} else {

// we do this in a couple of places including the cluster update thread. This one here is really just best effort

// to ensure we fail as fast as possible.

onJoinValidators.stream().forEach(a -> a.accept(node, state));

if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {

MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());

}

// try and connect to the node, if it fails, we can raise an exception back to the client...

transportService.connectToNode(node);

// validate the join request, will throw a failure if it fails, which will get back to the

// node calling the join request

try {

membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);

} catch (Exception e) {

logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),

e);

callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));

return;

}

nodeJoinController.handleJoinRequest(node, callback);

}

}

private void handleLeaveRequest(final DiscoveryNode node) {

if (lifecycleState() != Lifecycle.State.STARTED) {

// not started, ignore a node failure

return;

}

if (localNodeMaster()) {

removeNode(node, "zen-disco-node-left", "left");

} else if (node.equals(clusterState().nodes().getMasterNode())) {

handleMasterGone(node, null, "shut_down");

}

}

private void removeNode(final DiscoveryNode node, final String source, final String reason) {

masterService.submitStateUpdateTask(

source + "(" + node + "), reason(" + reason + ")",

new NodeRemovalClusterStateTaskExecutor.Task(node, reason),

ClusterStateTaskConfig.build(Priority.IMMEDIATE),

nodeRemovalExecutor,

nodeRemovalExecutor);

}

private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {

if (lifecycleState() != Lifecycle.State.STARTED) {

// not started, ignore a master failure

return;

}

if (localNodeMaster()) {

// we might get this on both a master telling us shutting down, and then the disconnect failure

return;

}

logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);

synchronized (stateMutex) {

if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {

// flush any pending cluster states from old master, so it will not be set as master again

pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));

rejoin("master left (reason = " + reason + ")");

}

}

}

//......

}

ZenDiscovery.MembershipListener的onJoin方法调用了handleJoinRequest方法,该方法主要是调用了nodeJoinController.handleJoinRequest(node, callback);onLeave方法调用了handleLeaveRequest方法,该方法针对local的执行removeNode,否则执行handleMasterGone

removeNode方法主要是执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor及ClusterStateTaskListener均为nodeRemovalExecutor

handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason))

NodeJoinController.handleJoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

public class NodeJoinController {

//......

/**

* processes or queues an incoming join request.

* <p>

* Note: doesn't do any validation. This should have been done before.

*/

public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {

if (electionContext != null) {

electionContext.addIncomingJoin(node, callback);

checkPendingJoinsAndElectIfNeeded();

} else {

masterService.submitStateUpdateTask("zen-disco-node-join",

node, ClusterStateTaskConfig.build(Priority.URGENT),

joinTaskExecutor, new JoinTaskListener(callback, logger));

}

}

//......

}

NodeJoinController的handleJoinRequest方法在electionContext不为null的时候执行electionContext.addIncomingJoin;否则执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor为joinTaskExecutor,ClusterStateTaskListener为JoinTaskListener

MasterService.submitStateUpdateTask

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

public class MasterService extends AbstractLifecycleComponent {

//......

public <T> void submitStateUpdateTask(String source, T task,

ClusterStateTaskConfig config,

ClusterStateTaskExecutor<T> executor,

ClusterStateTaskListener listener) {

submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);

}

public <T> void submitStateUpdateTasks(final String source,

final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,

final ClusterStateTaskExecutor<T> executor) {

if (!lifecycle.started()) {

return;

}

final ThreadContext threadContext = threadPool.getThreadContext();

final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);

try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {

threadContext.markAsSystemContext();

List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()

.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))

.collect(Collectors.toList());

taskBatcher.submitTasks(safeTasks, config.timeout());

} catch (EsRejectedExecutionException e) {

// ignore cases where we are shutting down..., there is really nothing interesting

// to be done here...

if (!lifecycle.stoppedOrClosed()) {

throw e;

}

}

}

class Batcher extends TaskBatcher {

Batcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {

super(logger, threadExecutor);

}

@Override

protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {

threadPool.generic().execute(

() -> tasks.forEach(

task -> ((UpdateTask) task).listener.onFailure(task.source,

new ProcessClusterEventTimeoutException(timeout, task.source))));

}

@Override

protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {

ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;

List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;

runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));

}

class UpdateTask extends BatchedTask {

final ClusterStateTaskListener listener;

UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,

ClusterStateTaskExecutor<?> executor) {

super(priority, source, executor, task);

this.listener = listener;

}

@Override

public String describeTasks(List<? extends BatchedTask> tasks) {

return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(

tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));

}

}

}

protected class TaskInputs {

public final String summary;

public final List<Batcher.UpdateTask> updateTasks;

public final ClusterStateTaskExecutor<Object> executor;

TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {

this.summary = summary;

this.executor = executor;

this.updateTasks = updateTasks;

}

public boolean runOnlyWhenMaster() {

return executor.runOnlyOnMaster();

}

public void onNoLongerMaster() {

updateTasks.forEach(task -> task.listener.onNoLongerMaster(task.source()));

}

}

//......

}

submitStateUpdateTasks方法主要是创建Batcher.UpdateTask,然后通过taskBatcher.submitTasks提交运行;Batcher继承了TaskBatcher,其run方法是调用runTasks方法,传递的参数为TaskInputs;TaskInputs的runOnlyWhenMaster方法调用的是executor.runOnlyOnMaster(),onNoLongerMaster调用的是task.listener.onNoLongerMaster(task.source())方法

ClusterStateTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java

public interface ClusterStateTaskExecutor<T> {

ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;

default boolean runOnlyOnMaster() {

return true;

}

default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {

}

default String describeTasks(List<T> tasks) {

return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator);

}

//......

}

ClusterStateTaskExecutor定义了execute方法,同时提供了runOnlyOnMaster、clusterStatePublished、describeTasks这几个default方法;它有很多实现类,比如JoinTaskExecutor、NodeRemovalClusterStateTaskExecutor

JoinTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

// visible for testing

public static class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {

private final AllocationService allocationService;

private final ElectMasterService electMasterService;

private final Logger logger;

private final int minimumMasterNodesOnLocalNode;

public JoinTaskExecutor(Settings settings, AllocationService allocationService, ElectMasterService electMasterService,

Logger logger) {

this.allocationService = allocationService;

this.electMasterService = electMasterService;

this.logger = logger;

minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);

}

@Override

public ClusterTasksResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {

final ClusterTasksResult.Builder<DiscoveryNode> results = ClusterTasksResult.builder();

final DiscoveryNodes currentNodes = currentState.nodes();

boolean nodesChanged = false;

ClusterState.Builder newState;

if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {

return results.successes(joiningNodes).build(currentState);

} else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {

assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes;

// use these joins to try and become the master.

// Note that we don't have to do any validation of the amount of joining nodes - the commit

// during the cluster state publishing guarantees that we have enough

newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);

nodesChanged = true;

} else if (currentNodes.isLocalNodeElectedMaster() == false) {

logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());

throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");

} else {

newState = ClusterState.builder(currentState);

}

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());

assert nodesBuilder.isLocalNodeElectedMaster();

Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();

Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion();

// we only enforce major version transitions on a fully formed clusters

final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;

// processing any joins

for (final DiscoveryNode node : joiningNodes) {

if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {

// noop

} else if (currentNodes.nodeExists(node)) {

logger.debug("received a join request for an existing node [{}]", node);

} else {

try {

if (enforceMajorVersion) {

MembershipAction.ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);

}

MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);

// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices

// we have to reject nodes that don't support all indices we have in this cluster

MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData());

nodesBuilder.add(node);

nodesChanged = true;

minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());

maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());

} catch (IllegalArgumentException | IllegalStateException e) {

results.failure(node, e);

continue;

}

}

results.success(node);

}

if (nodesChanged) {

newState.nodes(nodesBuilder);

return results.build(allocationService.reroute(newState.build(), "node_join"));

} else {

// we must return a new cluster state instance to force publishing. This is important

// for the joining node to finalize its join and set us as a master

return results.build(newState.build());

}

}

//......

@Override

public boolean runOnlyOnMaster() {

// we validate that we are allowed to change the cluster state during cluster state processing

return false;

}

@Override

public void clusterStatePublished(ClusterChangedEvent event) {

electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());

}

}

JoinTaskExecutor的execute方法要是构建根据joiningNodes构建ClusterState,如果nodes有变化,则调用allocationService.reroute(newState.build(), "node_join")对location进行reroute

NodeRemovalClusterStateTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public static class NodeRemovalClusterStateTaskExecutor

implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {

private final AllocationService allocationService;

private final ElectMasterService electMasterService;

private final Consumer<String> rejoin;

private final Logger logger;

public static class Task {

private final DiscoveryNode node;

private final String reason;

public Task(final DiscoveryNode node, final String reason) {

this.node = node;

this.reason = reason;

}

public DiscoveryNode node() {

return node;

}

public String reason() {

return reason;

}

@Override

public String toString() {

return node + " " + reason;

}

}

public NodeRemovalClusterStateTaskExecutor(

final AllocationService allocationService,

final ElectMasterService electMasterService,

final Consumer<String> rejoin,

final Logger logger) {

this.allocationService = allocationService;

this.electMasterService = electMasterService;

this.rejoin = rejoin;

this.logger = logger;

}

@Override

public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {

final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());

boolean removed = false;

for (final Task task : tasks) {

if (currentState.nodes().nodeExists(task.node())) {

remainingNodesBuilder.remove(task.node());

removed = true;

} else {

logger.debug("node [{}] does not exist in cluster state, ignoring", task);

}

}

if (!removed) {

// no nodes to remove, keep the current cluster state

return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);

}

final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);

final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);

if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {

final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());

rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",

masterNodes, electMasterService.minimumMasterNodes()));

return resultBuilder.build(currentState);

} else {

ClusterState ptasksDisassociatedState = PersistentTasksCustomMetaData.disassociateDeadNodes(remainingNodesClusterState);

return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));

}

}

// visible for testing

// hook is used in testing to ensure that correct cluster state is used to test whether a

// rejoin or reroute is needed

ClusterState remainingNodesClusterState(final ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) {

return ClusterState.builder(currentState).nodes(remainingNodesBuilder).build();

}

@Override

public void onFailure(final String source, final Exception e) {

logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);

}

@Override

public void onNoLongerMaster(String source) {

logger.debug("no longer master while processing node removal [{}]", source);

}

}

NodeRemovalClusterStateTaskExecutor同时实现了ClusterStateTaskExecutor, ClusterStateTaskListener接口;其execute方法主要是从currentState移除相应的node,构建remainingNodesClusterState,对于hasEnoughMasterNodes的情况则执行allocationService.disassociateDeadNodes,否则执行名为rejoin的Consumer

小结

MembershipAction定义三类请求,分别是LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的TransportRequestHandler,分别是LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler

LeaveRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onLeave方法;JoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onJoin方法;ValidateJoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用joinValidators挨个进行校验

MembershipListener接口定义了onJoin及onLeave方法,其中onJoin方法接收JoinCallback;它有一个同名实现类,在ZenDiscovery类中;ZenDiscovery.MembershipListener的onJoin方法调用了handleJoinRequest方法,该方法主要是调用了nodeJoinController.handleJoinRequest(node, callback);onLeave方法调用了handleLeaveRequest方法,该方法针对local的执行removeNode,否则执行handleMasterGone;removeNode方法主要是执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor及ClusterStateTaskListener均为nodeRemovalExecutor;handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason))

NodeJoinController的handleJoinRequest方法在electionContext不为null的时候执行electionContext.addIncomingJoin;否则执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor为joinTaskExecutor,ClusterStateTaskListener为JoinTaskListener

MasterService的submitStateUpdateTasks方法主要是创建Batcher.UpdateTask,然后通过taskBatcher.submitTasks提交运行;Batcher继承了TaskBatcher,其run方法是调用runTasks方法,传递的参数为TaskInputs;TaskInputs的runOnlyWhenMaster方法调用的是executor.runOnlyOnMaster(),onNoLongerMaster调用的是task.listener.onNoLongerMaster(task.source())方法

ClusterStateTaskExecutor定义了execute方法,同时提供了runOnlyOnMaster、clusterStatePublished、describeTasks这几个default方法;它有很多实现类,比如JoinTaskExecutor、NodeRemovalClusterStateTaskExecutor;JoinTaskExecutor的execute方法要是构建根据joiningNodes构建ClusterState,如果nodes有变化,则调用allocationService.reroute(newState.build(), "node_join")对location进行reroute;NodeRemovalClusterStateTaskExecutor同时实现了ClusterStateTaskExecutor, ClusterStateTaskListener接口;其execute方法主要是从currentState移除相应的node,构建remainingNodesClusterState,对于hasEnoughMasterNodes的情况则执行allocationService.disassociateDeadNodes,否则执行名为rejoin的Consumer

docMembershipAction

发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();