Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zen2: Add Cluster State Applier #34257

Merged
merged 22 commits into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -30,6 +31,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
Expand All @@ -39,6 +42,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
Expand All @@ -60,6 +64,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

public class Coordinator extends AbstractLifecycleComponent implements Discovery {

// the timeout for the publication of each value
Expand All @@ -77,6 +84,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
final Object mutex = new Object();
final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
private volatile Optional<ClusterState> lastCommittedState = Optional.empty();
private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved

private final PeerFinder peerFinder;
private final PreVoteCollector preVoteCollector;
Expand All @@ -86,6 +94,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
private final FollowersChecker followersChecker;
private final ClusterApplier clusterApplier;
@Nullable
private Releasable electionScheduler;
@Nullable
Expand All @@ -102,7 +111,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery

public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService,
MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier,
UnicastHostsProvider unicastHostsProvider, Random random) {
UnicastHostsProvider unicastHostsProvider, ClusterApplier clusterApplier, Random random) {
super(settings);
this.transportService = transportService;
this.masterService = masterService;
Expand All @@ -118,10 +127,12 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
configuredHostsResolver = new UnicastConfiguredHostsResolver(settings, transportService, unicastHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit,
logger);
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
}

Expand Down Expand Up @@ -167,13 +178,31 @@ private void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {
}
}

private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) {
synchronized (mutex) {
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);

coordinationState.get().handleCommit(applyCommitRequest);
lastCommittedState = Optional.of(coordinationState.get().getLastAcceptedState());
// TODO: send to applier
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(lastCommittedState.get()) : lastCommittedState.get();
if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// master node applies the committed state at the end of the publication process, not here.
applyListener.onResponse(null);
} else {
clusterApplier.onNewClusterState(applyCommitRequest.toString(), () -> applierState,
new ClusterApplyListener() {

@Override
public void onFailure(String source, Exception e) {
applyListener.onFailure(e);
}

@Override
public void onSuccess(String source) {
applyListener.onResponse(null);
}
});
}
}
}

Expand Down Expand Up @@ -299,6 +328,12 @@ void becomeCandidate(String method) {

followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);

if (applierState.nodes().getMasterNodeId() != null) {
applierState = clusterStateWithNoMasterBlock(applierState);
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
});
}
}

preVoteCollector.update(getPreVoteResponse(), null);
Expand Down Expand Up @@ -385,10 +420,20 @@ boolean publicationInProgress() {

@Override
protected void doStart() {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
synchronized (mutex) {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
.addGlobalBlock(NO_MASTER_BLOCK_WRITES))
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
.nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId()))
.build();
applierState = initialState;
clusterApplier.setInitialState(initialState);
}
}

@Override
Expand Down Expand Up @@ -419,6 +464,13 @@ public void invariant() {
assert peerFinder.getCurrentTerm() == getCurrentTerm();
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
if (lastCommittedState.isPresent()) {
assert applierState != null;
assert lastCommittedState.get().term() == applierState.term();
assert lastCommittedState.get().version() == applierState.version();
}
assert mode != Mode.CANDIDATE || applierState.nodes().getMasterNodeId() == null;
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id());
if (mode == Mode.LEADER) {
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();

Expand All @@ -433,7 +485,8 @@ public void invariant() {

final Set<DiscoveryNode> knownFollowers = followersChecker.getKnownFollowers();
final Set<DiscoveryNode> lastPublishedNodes = new HashSet<>();
if (becomingMaster == false || publicationInProgress()) {
if (becomingMaster == false ||
(publicationInProgress() && getCurrentTerm() == currentPublication.get().publishedState().term())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this is a surprising addition. How can we have a publication in progress whose term doesn't match our current term? That seems bad. I also had issues with this assertion in the vicinity of becoming a master, perhaps we should rethink this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've rewritten the assertions around currentPublication (see b77e5ab). I think we still need currentPublication for the term bumping though.

final ClusterState lastPublishedState
= currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState());
lastPublishedState.nodes().forEach(lastPublishedNodes::add);
Expand Down Expand Up @@ -524,7 +577,7 @@ private ClusterState clusterStateWithNoMasterBlock(ClusterState clusterState) {
"NO_MASTER_BLOCK should only be added by Coordinator";
// TODO: allow dynamically configuring NO_MASTER_BLOCK_ALL
final ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks()).addGlobalBlock(
DiscoverySettings.NO_MASTER_BLOCK_WRITES).build();
NO_MASTER_BLOCK_WRITES).build();
final DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();
return ClusterState.builder(clusterState).blocks(clusterBlocks).nodes(discoveryNodes).build();
} else {
Expand Down Expand Up @@ -593,42 +646,68 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
final Publication publication = new Publication(settings, publishRequest, wrappedAckListener,
transportService.getThreadPool()::relativeTimeInMillis) {

final Publication thisPublication = this;

private void failPublicationAndPossiblyBecomeCandidate(String reason) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";

assert currentPublication.get() == this;
currentPublication = Optional.empty();

// check if node has not already switched modes (by bumping term)
if (mode == Mode.LEADER && publishRequest.getAcceptedState().term() == getCurrentTerm()) {
becomeCandidate(reason);
}
}

@Override
protected void onCompletion(boolean committed) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert currentPublication.get() == this;
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
currentPublication = Optional.empty();
updateMaxTermSeen(getCurrentTerm()); // triggers term bump if new term was found during publication

localNodeAckEvent.addListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignore) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert coordinationState.get().getLastAcceptedTerm() == publishRequest.getAcceptedState().term()
&& coordinationState.get().getLastAcceptedVersion() == publishRequest.getAcceptedState().version()
: "onPossibleCompletion: term or version mismatch when publishing [" + this
+ "]: current version is now [" + coordinationState.get().getLastAcceptedVersion()
+ "] in term [" + coordinationState.get().getLastAcceptedTerm() + "]";
assert committed;

// TODO: send to applier
ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
clusterApplier.onNewClusterState(thisPublication.toString(), () -> applierState,
new ClusterApplyListener() {
@Override
public void onFailure(String source, Exception e) {
synchronized (mutex) {
failPublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
}
ackListener.onNodeAck(getLocalNode(), e);
publishListener.onFailure(e);
}

@Override
public void onSuccess(String source) {
synchronized (mutex) {
assert currentPublication.get() == thisPublication;
currentPublication = Optional.empty();
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
// trigger term bump if new term was found during publication
updateMaxTermSeen(getCurrentTerm());
}

ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
}
});
}

@Override
public void onFailure(Exception e) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (publishRequest.getAcceptedState().term() == coordinationState.get().getCurrentTerm() &&
publishRequest.getAcceptedState().version() == coordinationState.get().getLastPublishedVersion()) {
becomeCandidate("Publication.onCompletion(false)");
}
failPublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");

FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException(
"publication failed", e);
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
publishListener.onFailure(exception);
}
}, transportService.getThreadPool().generic());
}, EsExecutors.newDirectExecutorService());
}

@Override
Expand Down Expand Up @@ -667,8 +746,6 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app
}
};

assert currentPublication.isPresent() == false
: "[" + currentPublication.get() + "] in progress, cannot start [" + publication + ']';
currentPublication = Optional.of(publication);

transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
Expand Down Expand Up @@ -717,7 +794,6 @@ private void cancelActivePublication() {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (currentPublication.isPresent()) {
currentPublication.get().onTimeout();
assert currentPublication.isPresent() == false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -29,7 +30,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;

public class PublicationTransportHandler {
Expand All @@ -41,7 +42,8 @@ public class PublicationTransportHandler {

public PublicationTransportHandler(TransportService transportService,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
Consumer<ApplyCommitRequest> handleApplyCommit) {
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
Logger logger) {
this.transportService = transportService;

transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
Expand All @@ -50,10 +52,27 @@ public PublicationTransportHandler(TransportService transportService,

transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
ApplyCommitRequest::new,
(request, channel, task) -> {
handleApplyCommit.accept(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
(request, channel, task) -> handleApplyCommit.accept(request, new ActionListener<Void>() {

@Override
public void onResponse(Void aVoid) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (IOException e) {
logger.debug("failed to send response on commit", e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException ie) {
e.addSuppressed(ie);
logger.debug("failed to send response on commit", e);
}
}
}));
}

public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected synchronized void done() {

private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
try {
executorService.submit(new Runnable() {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Expand Down
Loading