Skip to content

Commit

Permalink
[Remote state] Integrate remote cluster state in publish/commit flow (o…
Browse files Browse the repository at this point in the history
…pensearch-project#9665)

* Integrate remote cluster state in publish/commit flow

Signed-off-by: Sooraj Sinha <[email protected]>
Signed-off-by: Kaushal Kumar <[email protected]>
  • Loading branch information
soosinha authored and kaushalmahi12 committed Sep 12, 2023
1 parent 5f588ac commit 6c887fd
Show file tree
Hide file tree
Showing 20 changed files with 419 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584))
- Add average concurrency metric for concurrent segment search ([#9670](https://github.com/opensearch-project/OpenSearch/issues/9670))
- [Remote state] Integrate remote cluster state in publish/commit flow ([#9665](https://github.com/opensearch-project/OpenSearch/pull/9665))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -49,6 +52,7 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;

/**
* The core class of the cluster state coordination algorithm, directly implementing the
Expand All @@ -64,8 +68,8 @@ public class CoordinationState {

private final ElectionStrategy electionStrategy;

// persisted state
private final PersistedState persistedState;
// persisted state registry
private final PersistedStateRegistry persistedStateRegistry;

// transient state
private VoteCollection joinVotes;
Expand All @@ -74,29 +78,38 @@ public class CoordinationState {
private long lastPublishedVersion;
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;

public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy) {
private final boolean isRemoteStateEnabled;

public CoordinationState(
DiscoveryNode localNode,
PersistedStateRegistry persistedStateRegistry,
ElectionStrategy electionStrategy,
Settings settings
) {
this.localNode = localNode;

// persisted state
this.persistedState = persistedState;
// persisted state registry
this.persistedStateRegistry = persistedStateRegistry;
this.electionStrategy = electionStrategy;

// transient state
this.joinVotes = new VoteCollection();
this.startedJoinSinceLastReboot = false;
this.electionWon = false;
this.lastPublishedVersion = 0L;
this.lastPublishedConfiguration = persistedState.getLastAcceptedState().getLastAcceptedConfiguration();
this.lastPublishedConfiguration = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings);
}

public long getCurrentTerm() {
return persistedState.getCurrentTerm();
return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getCurrentTerm();
}

public ClusterState getLastAcceptedState() {
return persistedState.getLastAcceptedState();
return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState();
}

public long getLastAcceptedTerm() {
Expand Down Expand Up @@ -186,7 +199,7 @@ public void setInitialState(ClusterState initialState) {
assert initialState.getLastAcceptedConfiguration().isEmpty() == false;
assert initialState.getLastCommittedConfiguration().isEmpty() == false;

persistedState.setLastAcceptedState(initialState);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(initialState);
}

/**
Expand Down Expand Up @@ -222,7 +235,7 @@ public Join handleStartJoin(StartJoinRequest startJoinRequest) {
logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason);
}

persistedState.setCurrentTerm(startJoinRequest.getTerm());
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setCurrentTerm(startJoinRequest.getTerm());
assert getCurrentTerm() == startJoinRequest.getTerm();
lastPublishedVersion = 0;
lastPublishedConfiguration = getLastAcceptedConfiguration();
Expand Down Expand Up @@ -436,7 +449,7 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.version(),
clusterState.term()
);
persistedState.setLastAcceptedState(clusterState);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
assert getLastAcceptedState() == clusterState;

return new PublishResponse(clusterState.term(), clusterState.version());
Expand Down Expand Up @@ -490,6 +503,7 @@ public Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNo
publishResponse.getVersion(),
publishResponse.getTerm()
);
handlePreCommit();
return Optional.of(new ApplyCommitRequest(localNode, publishResponse.getTerm(), publishResponse.getVersion()));
}

Expand Down Expand Up @@ -547,10 +561,36 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
applyCommit.getVersion()
);

persistedState.markLastAcceptedStateAsCommitted();
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

/**
* This method should be called just before sending the PublishRequest to all cluster nodes.
* @param clusterState The cluster state for which pre publish activities should happen.
*/
public void handlePrePublish(ClusterState clusterState) {
// Publishing the current state to remote store before sending the cluster state to other nodes.
// This is to ensure the remote store is the single source of truth for current state. Even if the current node
// goes down after sending the cluster state to other nodes, we should be able to read the remote state and
// recover the cluster.
if (isRemoteStateEnabled) {
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized";
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState);
}
}

/**
* This method should be called just before sending the ApplyCommitRequest to all cluster nodes.
*/
public void handlePreCommit() {
// Publishing the committed state to remote store before sending apply commit to other nodes.
if (isRemoteStateEnabled) {
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized";
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
}

public void invariant() {
assert getLastAcceptedTerm() <= getCurrentTerm();
assert electionWon() == isElectionQuorum(joinVotes);
Expand All @@ -564,7 +604,7 @@ public void invariant() {
}

public void close() throws IOException {
persistedState.close();
IOUtils.close(persistedStateRegistry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -201,7 +202,8 @@ public Coordinator(
Random random,
RerouteService rerouteService,
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -286,6 +288,7 @@ public Coordinator(
joinHelper::logLastFailedJoinAttempt
);
this.nodeHealthService = nodeHealthService;
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
}

Expand Down Expand Up @@ -820,8 +823,7 @@ boolean publicationInProgress() {
@Override
protected void doStart() {
synchronized (mutex) {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy));
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
Expand Down Expand Up @@ -1308,6 +1310,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);
coordinationState.get().handlePrePublish(clusterState);
publication.start(followersChecker.getFaultyNodes());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.cluster.coordination.CoordinationState.PersistedState;
import org.opensearch.common.util.io.IOUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* A class which encapsulates the PersistedStates
*
* @opensearch.internal
*/
public class PersistedStateRegistry implements Closeable {

public PersistedStateRegistry() {}

/**
* Distinct Types PersistedState which can be present on a node
*/
public enum PersistedStateType {
LOCAL,
REMOTE;
}

private final Map<PersistedStateType, PersistedState> persistedStates = new ConcurrentHashMap<>();

public void addPersistedState(PersistedStateType persistedStateType, PersistedState persistedState) {
PersistedState existingState = this.persistedStates.putIfAbsent(persistedStateType, persistedState);
assert existingState == null : "should only be set once, but already have " + existingState;
}

public PersistedState getPersistedState(PersistedStateType persistedStateType) {
return this.persistedStates.get(persistedStateType);
}

@Override
public void close() throws IOException {
IOUtils.close(persistedStates.values());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.ElectionStrategy;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -129,7 +130,8 @@ public DiscoveryModule(
Path configFile,
GatewayMetaState gatewayMetaState,
RerouteService rerouteService,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -205,7 +207,8 @@ public DiscoveryModule(
new Random(Randomness.get().nextLong()),
rerouteService,
electionStrategy,
nodeHealthService
nodeHealthService,
persistedStateRegistry
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
Loading

0 comments on commit 6c887fd

Please sign in to comment.