From 6c887fdac090b37e7cb2c20d85f26ff9fcf1cd8e Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Mon, 4 Sep 2023 14:58:58 +0530 Subject: [PATCH] [Remote state] Integrate remote cluster state in publish/commit flow (#9665) * Integrate remote cluster state in publish/commit flow Signed-off-by: Sooraj Sinha Signed-off-by: Kaushal Kumar --- CHANGELOG.md | 1 + .../coordination/CoordinationState.java | 68 +++++++++--- .../cluster/coordination/Coordinator.java | 9 +- .../coordination/PersistedStateRegistry.java | 52 +++++++++ .../opensearch/discovery/DiscoveryModule.java | 7 +- .../opensearch/gateway/GatewayMetaState.java | 37 ++++--- .../remote/RemoteClusterStateService.java | 15 +-- .../main/java/org/opensearch/node/Node.java | 24 ++++- .../coordination/CoordinationStateTests.java | 100 ++++++++++++++++-- .../coordination/CoordinatorTests.java | 2 +- .../cluster/coordination/NodeJoinTests.java | 8 +- .../coordination/PreVoteCollectorTests.java | 11 +- .../coordination/PublicationTests.java | 9 +- .../discovery/DiscoveryModuleTests.java | 4 +- .../GatewayMetaStatePersistedStateTests.java | 62 ++++++++++- .../snapshots/SnapshotResiliencyTests.java | 7 +- .../AbstractCoordinatorTestCase.java | 21 ++-- .../CoordinationStateTestCluster.java | 10 +- .../gateway/MockGatewayMetaState.java | 36 ++++++- .../opensearch/test/OpenSearchTestCase.java | 8 ++ 20 files changed, 419 insertions(+), 72 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/PersistedStateRegistry.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 19e50e93eb2e6..35826a8313578 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592)) - Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584)) - Add average concurrency metric for concurrent segment search ([#9670](https://github.com/opensearch-project/OpenSearch/issues/9670)) +- [Remote state] Integrate remote cluster state in publish/commit flow ([#9665](https://github.com/opensearch-project/OpenSearch/pull/9665)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 08cd7d0ab02db..fa32618b96ef3 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -35,8 +35,11 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.io.IOUtils; import java.io.Closeable; import java.io.IOException; @@ -49,6 +52,7 @@ import java.util.Set; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; /** * The core class of the cluster state coordination algorithm, directly implementing the @@ -64,8 +68,8 @@ public class CoordinationState { private final ElectionStrategy electionStrategy; - // persisted state - private final PersistedState persistedState; + // persisted state registry + private final PersistedStateRegistry persistedStateRegistry; // transient state private VoteCollection joinVotes; @@ -74,12 +78,18 @@ public class CoordinationState { private long lastPublishedVersion; private VotingConfiguration lastPublishedConfiguration; private VoteCollection publishVotes; - - public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy) { + private final boolean isRemoteStateEnabled; + + public CoordinationState( + DiscoveryNode localNode, + PersistedStateRegistry persistedStateRegistry, + ElectionStrategy electionStrategy, + Settings settings + ) { this.localNode = localNode; - // persisted state - this.persistedState = persistedState; + // persisted state registry + this.persistedStateRegistry = persistedStateRegistry; this.electionStrategy = electionStrategy; // transient state @@ -87,16 +97,19 @@ public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, this.startedJoinSinceLastReboot = false; this.electionWon = false; this.lastPublishedVersion = 0L; - this.lastPublishedConfiguration = persistedState.getLastAcceptedState().getLastAcceptedConfiguration(); + this.lastPublishedConfiguration = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) + .getLastAcceptedState() + .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); + this.isRemoteStateEnabled = REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings); } public long getCurrentTerm() { - return persistedState.getCurrentTerm(); + return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getCurrentTerm(); } public ClusterState getLastAcceptedState() { - return persistedState.getLastAcceptedState(); + return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState(); } public long getLastAcceptedTerm() { @@ -186,7 +199,7 @@ public void setInitialState(ClusterState initialState) { assert initialState.getLastAcceptedConfiguration().isEmpty() == false; assert initialState.getLastCommittedConfiguration().isEmpty() == false; - persistedState.setLastAcceptedState(initialState); + persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(initialState); } /** @@ -222,7 +235,7 @@ public Join handleStartJoin(StartJoinRequest startJoinRequest) { logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason); } - persistedState.setCurrentTerm(startJoinRequest.getTerm()); + persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setCurrentTerm(startJoinRequest.getTerm()); assert getCurrentTerm() == startJoinRequest.getTerm(); lastPublishedVersion = 0; lastPublishedConfiguration = getLastAcceptedConfiguration(); @@ -436,7 +449,7 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { clusterState.version(), clusterState.term() ); - persistedState.setLastAcceptedState(clusterState); + persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState); assert getLastAcceptedState() == clusterState; return new PublishResponse(clusterState.term(), clusterState.version()); @@ -490,6 +503,7 @@ public Optional handlePublishResponse(DiscoveryNode sourceNo publishResponse.getVersion(), publishResponse.getTerm() ); + handlePreCommit(); return Optional.of(new ApplyCommitRequest(localNode, publishResponse.getTerm(), publishResponse.getVersion())); } @@ -547,10 +561,36 @@ public void handleCommit(ApplyCommitRequest applyCommit) { applyCommit.getVersion() ); - persistedState.markLastAcceptedStateAsCommitted(); + persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted(); assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); } + /** + * This method should be called just before sending the PublishRequest to all cluster nodes. + * @param clusterState The cluster state for which pre publish activities should happen. + */ + public void handlePrePublish(ClusterState clusterState) { + // Publishing the current state to remote store before sending the cluster state to other nodes. + // This is to ensure the remote store is the single source of truth for current state. Even if the current node + // goes down after sending the cluster state to other nodes, we should be able to read the remote state and + // recover the cluster. + if (isRemoteStateEnabled) { + assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized"; + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState); + } + } + + /** + * This method should be called just before sending the ApplyCommitRequest to all cluster nodes. + */ + public void handlePreCommit() { + // Publishing the committed state to remote store before sending apply commit to other nodes. + if (isRemoteStateEnabled) { + assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized"; + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted(); + } + } + public void invariant() { assert getLastAcceptedTerm() <= getCurrentTerm(); assert electionWon() == isElectionQuorum(joinVotes); @@ -564,7 +604,7 @@ public void invariant() { } public void close() throws IOException { - persistedState.close(); + IOUtils.close(persistedStateRegistry); } /** diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 0274073ddfdc7..1559e77e1cf2d 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -181,6 +181,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); private final NodeHealthService nodeHealthService; + private final PersistedStateRegistry persistedStateRegistry; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -201,7 +202,8 @@ public Coordinator( Random random, RerouteService rerouteService, ElectionStrategy electionStrategy, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + PersistedStateRegistry persistedStateRegistry ) { this.settings = settings; this.transportService = transportService; @@ -286,6 +288,7 @@ public Coordinator( joinHelper::logLastFailedJoinAttempt ); this.nodeHealthService = nodeHealthService; + this.persistedStateRegistry = persistedStateRegistry; this.localNodeCommissioned = true; } @@ -820,8 +823,7 @@ boolean publicationInProgress() { @Override protected void doStart() { synchronized (mutex) { - CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); - coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy)); + coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings)); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); @@ -1308,6 +1310,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) leaderChecker.setCurrentNodes(publishNodes); followersChecker.setCurrentNodes(publishNodes); lagDetector.setTrackedNodes(publishNodes); + coordinationState.get().handlePrePublish(clusterState); publication.start(followersChecker.getFaultyNodes()); } } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateRegistry.java b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateRegistry.java new file mode 100644 index 0000000000000..470ab02a682a8 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateRegistry.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.cluster.coordination.CoordinationState.PersistedState; +import org.opensearch.common.util.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A class which encapsulates the PersistedStates + * + * @opensearch.internal + */ +public class PersistedStateRegistry implements Closeable { + + public PersistedStateRegistry() {} + + /** + * Distinct Types PersistedState which can be present on a node + */ + public enum PersistedStateType { + LOCAL, + REMOTE; + } + + private final Map persistedStates = new ConcurrentHashMap<>(); + + public void addPersistedState(PersistedStateType persistedStateType, PersistedState persistedState) { + PersistedState existingState = this.persistedStates.putIfAbsent(persistedStateType, persistedState); + assert existingState == null : "should only be set once, but already have " + existingState; + } + + public PersistedState getPersistedState(PersistedStateType persistedStateType) { + return this.persistedStates.get(persistedStateType); + } + + @Override + public void close() throws IOException { + IOUtils.close(persistedStates.values()); + } + +} diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 68fce4d9b9bb4..58d8fe2e17fcf 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.ElectionStrategy; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.allocation.AllocationService; @@ -129,7 +130,8 @@ public DiscoveryModule( Path configFile, GatewayMetaState gatewayMetaState, RerouteService rerouteService, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + PersistedStateRegistry persistedStateRegistry ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -205,7 +207,8 @@ public DiscoveryModule( new Random(Randomness.get().nextLong()), rerouteService, electionStrategy, - nodeHealthService + nodeHealthService, + persistedStateRegistry ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 02f1e5049b95c..4df18e4014c3d 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -45,6 +45,8 @@ import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.coordination.CoordinationState.PersistedState; import org.opensearch.cluster.coordination.InMemoryPersistedState; +import org.opensearch.cluster.coordination.PersistedStateRegistry; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexTemplateMetadata; import org.opensearch.cluster.metadata.Manifest; @@ -52,7 +54,6 @@ import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.SetOnce; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -83,6 +84,7 @@ import java.util.function.UnaryOperator; import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; /** * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts. @@ -103,17 +105,16 @@ public class GatewayMetaState implements Closeable { */ public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG"; - // Set by calling start() - private final SetOnce persistedState = new SetOnce<>(); + private PersistedStateRegistry persistedStateRegistry; public PersistedState getPersistedState() { - final PersistedState persistedState = this.persistedState.get(); + final PersistedState persistedState = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); assert persistedState != null : "not started"; return persistedState; } public Metadata getMetadata() { - return getPersistedState().getLastAcceptedState().metadata(); + return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata(); } public void start( @@ -123,9 +124,12 @@ public void start( MetaStateService metaStateService, MetadataIndexUpgradeService metadataIndexUpgradeService, MetadataUpgrader metadataUpgrader, - PersistedClusterStateService persistedClusterStateService + PersistedClusterStateService persistedClusterStateService, + RemoteClusterStateService remoteClusterStateService, + PersistedStateRegistry persistedStateRegistry ) { - assert persistedState.get() == null : "should only start once, but already have " + persistedState.get(); + assert this.persistedStateRegistry == null : "Persisted state registry should only be set once"; + this.persistedStateRegistry = persistedStateRegistry; if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.isDataNode(settings)) { try { @@ -147,6 +151,7 @@ public void start( } PersistedState persistedState = null; + PersistedState remotePersistedState = null; boolean success = false; try { final ClusterState clusterState = prepareInitialClusterState( @@ -160,6 +165,9 @@ public void start( if (DiscoveryNode.isClusterManagerNode(settings)) { persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState); + if (REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) { + remotePersistedState = new RemotePersistedState(remoteClusterStateService); + } } else { persistedState = new AsyncLucenePersistedState( settings, @@ -180,11 +188,14 @@ public void start( success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(persistedState); + IOUtils.closeWhileHandlingException(persistedStateRegistry); } } - this.persistedState.set(persistedState); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); + if (remotePersistedState != null) { + persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState); + } } catch (IOException e) { throw new OpenSearchException("failed to load metadata", e); } @@ -211,7 +222,7 @@ public void start( throw new UncheckedIOException(e); } } - persistedState.set(new InMemoryPersistedState(currentTerm, clusterState)); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(currentTerm, clusterState)); } } @@ -330,12 +341,14 @@ public void applyClusterState(ClusterChangedEvent event) { @Override public void close() throws IOException { - IOUtils.close(persistedState.get()); + IOUtils.close(persistedStateRegistry); } // visible for testing public boolean allPendingAsyncStatesWritten() { - final PersistedState ps = persistedState.get(); + // This method is invoked for persisted state implementations which write asynchronously. + // RemotePersistedState is invoked in synchronous path. So this logic is not required for remote state. + final PersistedState ps = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); if (ps instanceof AsyncLucenePersistedState) { return ((AsyncLucenePersistedState) ps).allPendingAsyncStatesWritten(); } else { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 1008e889f510a..980c2f9cf3ce4 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -95,7 +95,7 @@ public class RemoteClusterStateService implements Closeable { private final String nodeId; private final Supplier repositoriesService; private final Settings settings; - private final LongSupplier relativeTimeMillisSupplier; + private final LongSupplier relativeTimeNanosSupplier; private BlobStoreRepository blobStoreRepository; private volatile TimeValue slowWriteLoggingThreshold; @@ -104,13 +104,13 @@ public RemoteClusterStateService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, - LongSupplier relativeTimeMillisSupplier + LongSupplier relativeTimeNanosSupplier ) { assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; this.nodeId = nodeId; this.repositoriesService = repositoriesService; this.settings = settings; - this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; + this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); } @@ -123,7 +123,7 @@ public RemoteClusterStateService( */ @Nullable public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) throws IOException { - final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); return null; @@ -149,7 +149,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) thro allUploadedIndexMetadata.add(uploadedIndexMetadata); } final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false); - final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices", @@ -181,7 +181,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( ClusterState clusterState, ClusterMetadataManifest previousManifest ) throws IOException { - final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); return null; @@ -233,7 +233,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), false ); - final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " @@ -283,6 +283,7 @@ void ensureRepositorySet() { if (blobStoreRepository != null) { return; } + assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings); assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; final Repository repository = repositoriesService.get().repository(remoteStoreRepo); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 51cc7c9007159..e7c6eaebbf8ed 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -61,6 +61,7 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexTemplateMetadata; import org.opensearch.cluster.metadata.Metadata; @@ -127,6 +128,7 @@ import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.MetaStateService; import org.opensearch.gateway.PersistedClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; import org.opensearch.index.IndexModule; @@ -669,6 +671,18 @@ protected Node( clusterService.getClusterSettings(), threadPool::relativeTimeInMillis ); + final RemoteClusterStateService remoteClusterStateService; + if (RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) { + remoteClusterStateService = new RemoteClusterStateService( + nodeEnvironment.nodeId(), + repositoriesServiceReference::get, + settings, + clusterService.getClusterSettings(), + threadPool::preciseRelativeTimeInNanos + ); + } else { + remoteClusterStateService = null; + } // collect engine factory providers from plugins final Collection enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); @@ -874,6 +888,7 @@ protected Node( client, identityService ); + final PersistedStateRegistry persistedStateRegistry = new PersistedStateRegistry(); final GatewayMetaState gatewayMetaState = new GatewayMetaState(); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); final SearchTransportService searchTransportService = new SearchTransportService( @@ -977,7 +992,8 @@ protected Node( environment.configDir(), gatewayMetaState, rerouteService, - fsHealthService + fsHealthService, + persistedStateRegistry ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, @@ -1155,6 +1171,8 @@ protected Node( b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); + b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); + b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); }); injector = modules.createInjector(); @@ -1302,7 +1320,9 @@ public Node start() throws NodeValidationException { injector.getInstance(MetaStateService.class), injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class), - injector.getInstance(PersistedClusterStateService.class) + injector.getInstance(PersistedClusterStateService.class), + injector.getInstance(RemoteClusterStateService.class), + injector.getInstance(PersistedStateRegistry.class) ); if (Assertions.ENABLED) { try { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 45e71138abf99..02fae835d7cbf 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -36,6 +36,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; import org.opensearch.cluster.coordination.CoordinationState.PersistedState; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -44,19 +45,26 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.Assertions; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.io.IOException; import java.util.Collections; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.mockito.Mockito; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class CoordinationStateTests extends OpenSearchTestCase { @@ -67,6 +75,7 @@ public class CoordinationStateTests extends OpenSearchTestCase { private ClusterState initialStateNode1; private PersistedState ps1; + private PersistedStateRegistry psr1; private CoordinationState cs1; private CoordinationState cs2; @@ -97,10 +106,12 @@ public void setupNodes() { ); ps1 = new InMemoryPersistedState(0L, initialStateNode1); + psr1 = persistedStateRegistry(); + psr1.addPersistedState(PersistedStateType.LOCAL, ps1); - cs1 = createCoordinationState(ps1, node1); - cs2 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode2), node2); - cs3 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode3), node3); + cs1 = createCoordinationState(psr1, node1, Settings.EMPTY); + cs2 = createCoordinationState(createPersistedStateRegistry(initialStateNode2), node2, Settings.EMPTY); + cs3 = createCoordinationState(createPersistedStateRegistry(initialStateNode3), node3, Settings.EMPTY); } public static DiscoveryNode createNode(String id) { @@ -200,7 +211,7 @@ public void testJoinBeforeBootstrap() { public void testJoinWithNoStartJoinAfterReboot() { StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); Join v1 = cs1.handleStartJoin(startJoinRequest1); - cs1 = createCoordinationState(ps1, node1); + cs1 = createCoordinationState(psr1, node1, Settings.EMPTY); assertThat( expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(v1)).getMessage(), containsString("ignored join as term has not been incremented yet after reboot") @@ -886,8 +897,79 @@ public void testSafety() { ).runRandomly(); } - public static CoordinationState createCoordinationState(PersistedState storage, DiscoveryNode localNode) { - return new CoordinationState(localNode, storage, ElectionStrategy.DEFAULT_INSTANCE); + public void testHandlePrePublishAndCommitWhenRemoteStateDisabled() { + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); + final PersistedStateRegistry persistedStateRegistrySpy = Mockito.spy(persistedStateRegistry); + final CoordinationState coordinationState = createCoordinationState(persistedStateRegistrySpy, node1, Settings.EMPTY); + final VotingConfiguration initialConfig = VotingConfiguration.of(node1); + final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); + coordinationState.handlePrePublish(clusterState); + Mockito.verify(persistedStateRegistrySpy, Mockito.times(0)).getPersistedState(PersistedStateType.REMOTE); + assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE), nullValue()); + final ClusterState clusterState2 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + coordinationState.handlePrePublish(clusterState2); + Mockito.verify(persistedStateRegistrySpy, Mockito.times(0)).getPersistedState(PersistedStateType.REMOTE); + assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE), nullValue()); + coordinationState.handlePreCommit(); + Mockito.verify(persistedStateRegistrySpy, Mockito.times(0)).getPersistedState(PersistedStateType.REMOTE); + assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE), nullValue()); + } + + public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOException { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + final VotingConfiguration initialConfig = VotingConfiguration.of(node1); + final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); + Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState)) + .thenReturn( + new ClusterMetadataManifest( + 0L, + 0L, + randomAlphaOfLength(10), + randomAlphaOfLength(10), + Version.CURRENT, + randomAlphaOfLength(10), + false, + Collections.emptyList() + ) + ); + + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); + persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, new RemotePersistedState(remoteClusterStateService)); + final Settings settings = Settings.builder() + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings); + coordinationState.handlePrePublish(clusterState); + Mockito.verifyNoInteractions(remoteClusterStateService); + assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState)); + + final ClusterState clusterState2 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + final ClusterMetadataManifest manifest2 = new ClusterMetadataManifest( + 0L, + 1L, + randomAlphaOfLength(10), + randomAlphaOfLength(10), + Version.CURRENT, + randomAlphaOfLength(10), + false, + Collections.emptyList() + ); + Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState2)).thenReturn(manifest2); + coordinationState.handlePrePublish(clusterState2); + Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState2); + + coordinationState.handlePreCommit(); + Mockito.verify(remoteClusterStateService, Mockito.times(1)).markLastStateAsCommitted(clusterState2, manifest2); + } + + public static CoordinationState createCoordinationState( + PersistedStateRegistry persistedStateRegistry, + DiscoveryNode localNode, + Settings settings + ) { + return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings); } public static ClusterState clusterState( @@ -950,4 +1032,10 @@ public static ClusterState setValue(ClusterState clusterState, long value) { public static long value(ClusterState clusterState) { return clusterState.metadata().persistentSettings().getAsLong("value", 0L); } + + private static PersistedStateRegistry createPersistedStateRegistry(ClusterState clusterState) { + final PersistedStateRegistry persistedStateRegistry = new PersistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, clusterState)); + return persistedStateRegistry; + } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java index d08a80208c533..a3129655148ab 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java @@ -1256,7 +1256,7 @@ public void testCannotJoinClusterWithDifferentUUID() throws IllegalAccessExcepti final ClusterNode newNode = cluster1.new ClusterNode( nextNodeIndex.getAndIncrement(), nodeInOtherCluster.getLocalNode(), n -> cluster1.new MockPersistedState( - n, nodeInOtherCluster.persistedState, Function.identity(), Function.identity() + n, nodeInOtherCluster.persistedStateRegistry, Function.identity(), Function.identity() ), nodeInOtherCluster.nodeSettings, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info") ); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index ab91099cae11f..8d798b38dc023 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; import org.opensearch.cluster.decommission.DecommissionStatus; @@ -245,6 +246,8 @@ protected void onSendRequest( clusterSettings, Collections.emptySet() ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(term, initialState)); coordinator = new Coordinator( "test_node", Settings.EMPTY, @@ -253,14 +256,15 @@ protected void onSendRequest( writableRegistry(), OpenSearchAllocationTestCase.createAllocationService(Settings.EMPTY), clusterManagerService, - () -> new InMemoryPersistedState(term, initialState), + () -> persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL), r -> emptyList(), new NoOpClusterApplier(), Collections.emptyList(), random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE, - nodeHealthService + nodeHealthService, + persistedStateRegistry ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java index 8b35856f0fb4c..8f779097d50d4 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java @@ -35,6 +35,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; @@ -290,10 +291,16 @@ public void testPrevotingIndicatesElectionSuccess() { DiscoveryNode[] votingNodes = votingNodesSet.toArray(new DiscoveryNode[0]); startAndRunCollector(votingNodes); + PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState( + PersistedStateType.LOCAL, + new InMemoryPersistedState(currentTerm, makeClusterState(votingNodes)) + ); final CoordinationState coordinationState = new CoordinationState( localNode, - new InMemoryPersistedState(currentTerm, makeClusterState(votingNodes)), - ElectionStrategy.DEFAULT_INSTANCE + persistedStateRegistry, + ElectionStrategy.DEFAULT_INSTANCE, + Settings.EMPTY ); final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java index 79c141aa69b9f..4d18ff95887dd 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java @@ -35,6 +35,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; @@ -91,11 +92,9 @@ class MockNode { CoordinationMetadata.VotingConfiguration.EMPTY_CONFIG, 0L ); - coordinationState = new CoordinationState( - localNode, - new InMemoryPersistedState(0L, initialState), - ElectionStrategy.DEFAULT_INSTANCE - ); + PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState)); + coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY); } final DiscoveryNode localNode; diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index 4a987c9a6fe02..b32dd7c6c240b 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -34,6 +34,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.Coordinator; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.service.ClusterApplier; @@ -120,7 +121,8 @@ private DiscoveryModule newModule(Settings settings, List plugi createTempDir().toAbsolutePath(), gatewayMetaState, mock(RerouteService.class), - null + null, + new PersistedStateRegistry() ); } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 47fea55242240..c293768151fa4 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -42,6 +42,8 @@ import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.coordination.CoordinationState; +import org.opensearch.cluster.coordination.PersistedStateRegistry; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Manifest; import org.opensearch.cluster.metadata.Metadata; @@ -64,6 +66,7 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -78,6 +81,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.mockito.Mockito; @@ -91,6 +95,7 @@ import static org.mockito.Mockito.when; public class GatewayMetaStatePersistedStateTests extends OpenSearchTestCase { + private NodeEnvironment nodeEnvironment; private ClusterName clusterName; private Settings settings; @@ -121,9 +126,15 @@ public void tearDown() throws Exception { private CoordinationState.PersistedState newGatewayPersistedState() { final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays); - gateway.start(settings, nodeEnvironment, xContentRegistry()); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + gateway.start(settings, nodeEnvironment, xContentRegistry(), persistedStateRegistry); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class)); + assertThat( + persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL), + instanceOf(GatewayMetaState.LucenePersistedState.class) + ); + assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE), nullValue()); return persistedState; } @@ -433,6 +444,26 @@ public void testDataOnlyNodePersistence() throws Exception { new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ); + Supplier remoteClusterStateServiceSupplier = () -> { + if (RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) { + return new RemoteClusterStateService( + nodeEnvironment.nodeId(), + () -> new RepositoriesService( + settings, + clusterService, + transportService, + Collections.emptyMap(), + Collections.emptyMap(), + transportService.getThreadPool() + ), + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + () -> 0L + ); + } else { + return null; + } + }; gateway.start( settings, transportService, @@ -440,7 +471,9 @@ public void testDataOnlyNodePersistence() throws Exception { new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, - persistedClusterStateService + persistedClusterStateService, + remoteClusterStateServiceSupplier.get(), + new PersistedStateRegistry() ); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class)); @@ -719,6 +752,31 @@ public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOExcept assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(secondClusterState)); } + public void testGatewayForRemoteState() throws IOException { + MockGatewayMetaState gateway = null; + try { + gateway = new MockGatewayMetaState(localNode, bigArrays); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + final Settings settingWithRemoteStateEnabled = Settings.builder() + .put(settings) + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + gateway.start(settingWithRemoteStateEnabled, nodeEnvironment, xContentRegistry(), persistedStateRegistry); + final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); + assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class)); + assertThat( + persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL), + instanceOf(GatewayMetaState.LucenePersistedState.class) + ); + assertThat( + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE), + instanceOf(GatewayMetaState.RemotePersistedState.class) + ); + } finally { + IOUtils.close(gateway); + } + } + private static BigArrays getBigArrays() { return usually() ? BigArrays.NON_RECYCLING_INSTANCE diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index a1cedbb6e24c0..81e3b74c40104 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -130,6 +130,8 @@ import org.opensearch.cluster.coordination.ElectionStrategy; import org.opensearch.cluster.coordination.InMemoryPersistedState; import org.opensearch.cluster.coordination.MockSinglePrioritizingExecutor; +import org.opensearch.cluster.coordination.PersistedStateRegistry; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -2488,6 +2490,8 @@ public void start(ClusterState initialState) { initialState.term(), stateForNode(initialState, node) ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); coordinator = new Coordinator( node.getName(), clusterService.getSettings(), @@ -2507,7 +2511,8 @@ public void start(ClusterState initialState) { random(), rerouteService, ElectionStrategy.DEFAULT_INSTANCE, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + persistedStateRegistry ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 8fac407547a9d..c6960b5ca33ff 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; import org.opensearch.cluster.coordination.LinearizabilityChecker.History; import org.opensearch.cluster.coordination.LinearizabilityChecker.SequentialSpec; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -846,7 +847,7 @@ class MockPersistedState implements CoordinationState.PersistedState { nodeEnvironment = newNodeEnvironment(); nodeEnvironments.add(nodeEnvironment); final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode, bigArrays); - gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); + gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry(), persistedStateRegistry()); delegate = gatewayMetaState.getPersistedState(); } else { nodeEnvironment = null; @@ -864,11 +865,12 @@ class MockPersistedState implements CoordinationState.PersistedState { MockPersistedState( DiscoveryNode newLocalNode, - MockPersistedState oldState, + PersistedStateRegistry persistedStateRegistry, Function adaptGlobalMetadata, Function adaptCurrentTerm ) { try { + MockPersistedState oldState = (MockPersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); if (oldState.nodeEnvironment != null) { nodeEnvironment = oldState.nodeEnvironment; final Metadata updatedMetadata = adaptGlobalMetadata.apply(oldState.getLastAcceptedState().metadata()); @@ -890,7 +892,7 @@ class MockPersistedState implements CoordinationState.PersistedState { } } final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode, bigArrays); - gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); + gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry(), persistedStateRegistry()); delegate = gatewayMetaState.getPersistedState(); } else { nodeEnvironment = null; @@ -1025,7 +1027,7 @@ class ClusterNode { private final int nodeIndex; Coordinator coordinator; private final DiscoveryNode localNode; - final MockPersistedState persistedState; + final PersistedStateRegistry persistedStateRegistry; final Settings nodeSettings; private AckedFakeThreadPoolClusterManagerService clusterManagerService; private DisruptableClusterApplierService clusterApplierService; @@ -1056,7 +1058,9 @@ class ClusterNode { this.nodeIndex = nodeIndex; this.localNode = localNode; this.nodeSettings = nodeSettings; - persistedState = persistedStateSupplier.apply(localNode); + final MockPersistedState persistedState = persistedStateSupplier.apply(localNode); + persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); assertTrue("must use a fresh PersistedState", openPersistedStates.add(persistedState)); boolean success = false; try { @@ -1144,7 +1148,8 @@ protected Optional getDisruptableMockTransport(Transpo Randomness.get(), (s, p, r) -> {}, getElectionStrategy(), - nodeHealthService + nodeHealthService, + persistedStateRegistry ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( @@ -1204,14 +1209,14 @@ ClusterNode restartedNode( return new ClusterNode( nodeIndex, newLocalNode, - node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm), + node -> new MockPersistedState(newLocalNode, persistedStateRegistry, adaptGlobalMetadata, adaptCurrentTerm), nodeSettings, nodeHealthService ); } private CoordinationState.PersistedState getPersistedState() { - return persistedState; + return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); } String getId() { diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java index 9f7802a401391..cbe695cbb2136 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java @@ -34,6 +34,7 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -128,6 +129,8 @@ static class ClusterNode { DiscoveryNode localNode; CoordinationState.PersistedState persistedState; + PersistedStateRegistry persistedStateRegistry; + CoordinationState state; ClusterNode(DiscoveryNode localNode, ElectionStrategy electionStrategy) { @@ -143,8 +146,11 @@ static class ClusterNode { 0L ) ); + persistedStateRegistry = new PersistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); + this.electionStrategy = electionStrategy; - state = new CoordinationState(localNode, persistedState, electionStrategy); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); } void reboot() { @@ -183,7 +189,7 @@ void reboot() { localNode.getVersion() ); - state = new CoordinationState(localNode, persistedState, electionStrategy); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); } void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) { diff --git a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java index 6a3748e55394e..14d646cf84a0a 100644 --- a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java @@ -33,6 +33,7 @@ package org.opensearch.gateway; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.Manifest; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; @@ -44,11 +45,15 @@ import org.opensearch.common.util.BigArrays; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.plugins.MetadataUpgrader; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; +import java.util.function.Supplier; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -84,7 +89,12 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust return ClusterStateUpdaters.setLocalNode(clusterState, localNode); } - public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXContentRegistry xContentRegistry) { + public void start( + Settings settings, + NodeEnvironment nodeEnvironment, + NamedXContentRegistry xContentRegistry, + PersistedStateRegistry persistedStateRegistry + ) { final TransportService transportService = mock(TransportService.class); when(transportService.getThreadPool()).thenReturn(mock(ThreadPool.class)); final ClusterService clusterService = mock(ClusterService.class); @@ -97,6 +107,26 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont } catch (IOException e) { throw new AssertionError(e); } + Supplier remoteClusterStateServiceSupplier = () -> { + if (RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) { + return new RemoteClusterStateService( + nodeEnvironment.nodeId(), + () -> new RepositoriesService( + settings, + clusterService, + transportService, + Collections.emptyMap(), + Collections.emptyMap(), + transportService.getThreadPool() + ), + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + () -> 0L + ); + } else { + return null; + } + }; start( settings, transportService, @@ -110,7 +140,9 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont bigArrays, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L - ) + ), + remoteClusterStateServiceSupplier.get(), + persistedStateRegistry ); } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index d9b3bd9109f23..8490ee4fc39bc 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -65,6 +65,7 @@ import org.opensearch.bootstrap.BootstrapForTesting; import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.CheckedRunnable; @@ -1543,6 +1544,13 @@ protected NamedWriteableRegistry writableRegistry() { return new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); } + /** + * The {@link PersistedStateRegistry} to use for this test. Subclasses should override and use liberally. + */ + protected PersistedStateRegistry persistedStateRegistry() { + return new PersistedStateRegistry(); + } + /** * Create a "mock" script for use either with {@link MockScriptEngine} or anywhere where you need a script but don't really care about * its contents.