diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index 4d542566ccd70..dff6b5add0b09 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -422,7 +422,7 @@ public void handleCommit(ApplyCommitRequest applyCommit) { logger.trace("handleCommit: applying commit request for term [{}] and version [{}]", applyCommit.getTerm(), applyCommit.getVersion()); - persistedState.markLastAcceptedConfigAsCommitted(); + persistedState.markLastAcceptedStateAsCommitted(); assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); } @@ -471,16 +471,32 @@ public interface PersistedState { /** * Marks the last accepted cluster state as committed. * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set, - * with the last committed configuration now corresponding to the last accepted configuration. + * with the last committed configuration now corresponding to the last accepted configuration, and the cluster uuid, if set, + * marked as committed. */ - default void markLastAcceptedConfigAsCommitted() { + default void markLastAcceptedStateAsCommitted() { final ClusterState lastAcceptedState = getLastAcceptedState(); + MetaData.Builder metaDataBuilder = null; if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) { final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData()) .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) .build(); - final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build(); - setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build()); + metaDataBuilder = MetaData.builder(lastAcceptedState.metaData()); + metaDataBuilder.coordinationMetaData(coordinationMetaData); + } + // if we receive a commit from a Zen1 master that has not recovered its state yet, the cluster uuid might not been known yet. + assert lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false || + lastAcceptedState.term() == ZEN1_BWC_TERM : + "received cluster state with empty cluster uuid but not Zen1 BWC term: " + lastAcceptedState; + if (lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false && + lastAcceptedState.metaData().clusterUUIDCommitted() == false) { + if (metaDataBuilder == null) { + metaDataBuilder = MetaData.builder(lastAcceptedState.metaData()); + } + metaDataBuilder.clusterUUIDCommitted(true); + } + if (metaDataBuilder != null) { + setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaDataBuilder).build()); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 4bf977f8398ce..dff9cdcb8a2a5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -147,7 +147,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.masterService = masterService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, - this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); + this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); this.persistedStateSupplier = persistedStateSupplier; this.discoverySettings = new DiscoverySettings(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); @@ -281,7 +281,18 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { + lastKnownLeader + ", rejecting"); } - if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) { + final ClusterState localState = coordinationState.get().getLastAcceptedState(); + + if (localState.metaData().clusterUUIDCommitted() && + localState.metaData().clusterUUID().equals(publishRequest.getAcceptedState().metaData().clusterUUID()) == false) { + logger.warn("received cluster state from {} with a different cluster uuid {} than local cluster uuid {}, rejecting", + sourceNode, publishRequest.getAcceptedState().metaData().clusterUUID(), localState.metaData().clusterUUID()); + throw new CoordinationStateRejectedException("received cluster state from " + sourceNode + + " with a different cluster uuid " + publishRequest.getAcceptedState().metaData().clusterUUID() + + " than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting"); + } + + if (publishRequest.getAcceptedState().term() > localState.term()) { // only do join validation if we have not accepted state from this master yet onJoinValidators.forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState())); } @@ -653,6 +664,7 @@ public void invariant() { assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID); + assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted(); assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector + " vs " + getPreVoteResponse(); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 53fada396fcef..a9309e9fe638a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -62,6 +62,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.Supplier; public class JoinHelper { @@ -84,7 +85,7 @@ public class JoinHelper { final Set> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet(); public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, - TransportService transportService, LongSupplier currentTermSupplier, + TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, BiConsumer joinHandler, Function joinLeaderInTerm, Collection> joinValidators) { this.masterService = masterService; @@ -132,6 +133,13 @@ public ClusterTasksResult execute(ClusterState currentSta transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME, MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC, (request, channel, task) -> { + final ClusterState localState = currentStateSupplier.get(); + if (localState.metaData().clusterUUIDCommitted() && + localState.metaData().clusterUUID().equals(request.getState().metaData().clusterUUID()) == false) { + throw new CoordinationStateRejectedException("join validation on cluster state" + + " with a different cluster uuid " + request.getState().metaData().clusterUUID() + + " than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting"); + } joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); channel.sendResponse(Empty.INSTANCE); }); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 3cce3f791d2b8..54c3001d9036f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -88,6 +88,7 @@ public class MetaData implements Iterable, Diffable, To private static final Logger logger = LogManager.getLogger(MetaData.class); public static final String ALL = "_all"; + public static final String UNKNOWN_CLUSTER_UUID = "_na_"; public enum XContentContext { /* Custom metadata should be returns as part of API call */ @@ -159,6 +160,7 @@ public interface Custom extends NamedDiffable, ToXContentFragment, Clust private static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); private final String clusterUUID; + private final boolean clusterUUIDCommitted; private final long version; private final CoordinationMetaData coordinationMetaData; @@ -179,12 +181,13 @@ public interface Custom extends NamedDiffable, ToXContentFragment, Clust private final SortedMap aliasAndIndexLookup; - MetaData(String clusterUUID, long version, CoordinationMetaData coordinationMetaData, + MetaData(String clusterUUID, boolean clusterUUIDCommitted, long version, CoordinationMetaData coordinationMetaData, Settings transientSettings, Settings persistentSettings, ImmutableOpenMap indices, ImmutableOpenMap templates, ImmutableOpenMap customs, String[] allIndices, String[] allOpenIndices, String[] allClosedIndices, SortedMap aliasAndIndexLookup) { this.clusterUUID = clusterUUID; + this.clusterUUIDCommitted = clusterUUIDCommitted; this.version = version; this.coordinationMetaData = coordinationMetaData; this.transientSettings = transientSettings; @@ -218,6 +221,14 @@ public String clusterUUID() { return this.clusterUUID; } + /** + * Whether the current node with the given cluster state is locked into the cluster with the UUID returned by {@link #clusterUUID()}, + * meaning that it will not accept any cluster state with a different clusterUUID. + */ + public boolean clusterUUIDCommitted() { + return this.clusterUUIDCommitted; + } + /** * Returns the merged transient and persistent settings. */ @@ -757,6 +768,12 @@ public static boolean isGlobalStateEquals(MetaData metaData1, MetaData metaData2 if (!metaData1.templates.equals(metaData2.templates())) { return false; } + if (!metaData1.clusterUUID.equals(metaData2.clusterUUID)) { + return false; + } + if (metaData1.clusterUUIDCommitted != metaData2.clusterUUIDCommitted) { + return false; + } // Check if any persistent metadata needs to be saved int customCount1 = 0; for (ObjectObjectCursor cursor : metaData1.customs) { @@ -798,6 +815,7 @@ private static class MetaDataDiff implements Diff { private long version; private String clusterUUID; + private boolean clusterUUIDCommitted; private CoordinationMetaData coordinationMetaData; private Settings transientSettings; private Settings persistentSettings; @@ -807,6 +825,7 @@ private static class MetaDataDiff implements Diff { MetaDataDiff(MetaData before, MetaData after) { clusterUUID = after.clusterUUID; + clusterUUIDCommitted = after.clusterUUIDCommitted; version = after.version; coordinationMetaData = after.coordinationMetaData; transientSettings = after.transientSettings; @@ -818,8 +837,11 @@ private static class MetaDataDiff implements Diff { MetaDataDiff(StreamInput in) throws IOException { clusterUUID = in.readString(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + clusterUUIDCommitted = in.readBoolean(); + } version = in.readLong(); - if (in.getVersion().onOrAfter(Version.V_7_0_0)) { //TODO revisit after Zen2 BWC is implemented + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { coordinationMetaData = new CoordinationMetaData(in); } else { coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA; @@ -836,6 +858,9 @@ private static class MetaDataDiff implements Diff { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(clusterUUID); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeBoolean(clusterUUIDCommitted); + } out.writeLong(version); if (out.getVersion().onOrAfter(Version.V_7_0_0)) { coordinationMetaData.writeTo(out); @@ -851,6 +876,7 @@ public void writeTo(StreamOutput out) throws IOException { public MetaData apply(MetaData part) { Builder builder = builder(); builder.clusterUUID(clusterUUID); + builder.clusterUUIDCommitted(clusterUUIDCommitted); builder.version(version); builder.coordinationMetaData(coordinationMetaData); builder.transientSettings(transientSettings); @@ -866,6 +892,9 @@ public static MetaData readFrom(StreamInput in) throws IOException { Builder builder = new Builder(); builder.version = in.readLong(); builder.clusterUUID = in.readString(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + builder.clusterUUIDCommitted = in.readBoolean(); + } if (in.getVersion().onOrAfter(Version.V_7_0_0)) { builder.coordinationMetaData(new CoordinationMetaData(in)); } @@ -891,6 +920,9 @@ public static MetaData readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeLong(version); out.writeString(clusterUUID); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeBoolean(clusterUUIDCommitted); + } if (out.getVersion().onOrAfter(Version.V_7_0_0)) { coordinationMetaData.writeTo(out); } @@ -930,6 +962,7 @@ public static Builder builder(MetaData metaData) { public static class Builder { private String clusterUUID; + private boolean clusterUUIDCommitted; private long version; private CoordinationMetaData coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA; @@ -941,7 +974,7 @@ public static class Builder { private final ImmutableOpenMap.Builder customs; public Builder() { - clusterUUID = "_na_"; + clusterUUID = UNKNOWN_CLUSTER_UUID; indices = ImmutableOpenMap.builder(); templates = ImmutableOpenMap.builder(); customs = ImmutableOpenMap.builder(); @@ -950,6 +983,7 @@ public Builder() { public Builder(MetaData metaData) { this.clusterUUID = metaData.clusterUUID; + this.clusterUUIDCommitted = metaData.clusterUUIDCommitted; this.coordinationMetaData = metaData.coordinationMetaData; this.transientSettings = metaData.transientSettings; this.persistentSettings = metaData.persistentSettings; @@ -1125,8 +1159,13 @@ public Builder clusterUUID(String clusterUUID) { return this; } + public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) { + this.clusterUUIDCommitted = clusterUUIDCommitted; + return this; + } + public Builder generateClusterUuidIfNeeded() { - if (clusterUUID.equals("_na_")) { + if (clusterUUID.equals(UNKNOWN_CLUSTER_UUID)) { clusterUUID = UUIDs.randomBase64UUID(); } return this; @@ -1182,8 +1221,9 @@ public MetaData build() { String[] allOpenIndicesArray = allOpenIndices.toArray(new String[allOpenIndices.size()]); String[] allClosedIndicesArray = allClosedIndices.toArray(new String[allClosedIndices.size()]); - return new MetaData(clusterUUID, version, coordinationMetaData, transientSettings, persistentSettings, indices.build(), - templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray, aliasAndIndexLookup); + return new MetaData(clusterUUID, clusterUUIDCommitted, version, coordinationMetaData, transientSettings, persistentSettings, + indices.build(), templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray, + aliasAndIndexLookup); } private SortedMap buildAliasAndIndexLookup() { @@ -1226,6 +1266,7 @@ public static void toXContent(MetaData metaData, XContentBuilder builder, ToXCon builder.field("version", metaData.version()); builder.field("cluster_uuid", metaData.clusterUUID); + builder.field("cluster_uuid_committed", metaData.clusterUUIDCommitted); builder.startObject("cluster_coordination"); metaData.coordinationMetaData().toXContent(builder, params); @@ -1324,6 +1365,8 @@ public static MetaData fromXContent(XContentParser parser) throws IOException { builder.version = parser.longValue(); } else if ("cluster_uuid".equals(currentFieldName) || "uuid".equals(currentFieldName)) { builder.clusterUUID = parser.text(); + } else if ("cluster_uuid_committed".equals(currentFieldName)) { + builder.clusterUUIDCommitted = parser.booleanValue(); } else { throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 36495914bddec..c3028de1801da 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -20,6 +20,7 @@ import com.carrotsearch.randomizedtesting.RandomizedContext; import org.apache.logging.log4j.CloseableThreadContext; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -36,6 +37,7 @@ import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; +import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; @@ -48,6 +50,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -59,9 +62,11 @@ import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus; import org.elasticsearch.transport.TransportService; @@ -84,6 +89,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -137,6 +143,13 @@ public class CoordinatorTests extends ESTestCase { private final List nodeEnvironments = new ArrayList<>(); + private final AtomicInteger nextNodeIndex = new AtomicInteger(); + + @Before + public void resetNodeIndexBeforeEachTest() { + nextNodeIndex.set(0); + } + @After public void closeNodeEnvironmentsAfterEachTest() { for (NodeEnvironment nodeEnvironment : nodeEnvironments) { @@ -153,6 +166,7 @@ public void resetPortCounterBeforeEachTest() { // check that runRandomly leads to reproducible results public void testRepeatableTests() throws Exception { final Callable test = () -> { + resetNodeIndexBeforeEachTest(); final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); final long afterRunRandomly = value(cluster.getAnyNode().getLastAppliedClusterState()); @@ -1001,6 +1015,52 @@ public void testClusterCannotFormWithFailingJoinValidation() { assertTrue(cluster.clusterNodes.stream().allMatch(cn -> cn.getLastAppliedClusterState().version() == 0)); } + public void testCannotJoinClusterWithDifferentUUID() throws IllegalAccessException { + final Cluster cluster1 = new Cluster(randomIntBetween(1, 3)); + cluster1.runRandomly(); + cluster1.stabilise(); + + final Cluster cluster2 = new Cluster(3); + cluster2.runRandomly(); + cluster2.stabilise(); + + final ClusterNode shiftedNode = randomFrom(cluster2.clusterNodes).restartedNode(); + final ClusterNode newNode = cluster1.new ClusterNode(nextNodeIndex.getAndIncrement(), + shiftedNode.getLocalNode(), n -> shiftedNode.persistedState); + cluster1.clusterNodes.add(newNode); + + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test1", + JoinHelper.class.getCanonicalName(), + Level.INFO, + "*failed to join*")); + Logger joinLogger = LogManager.getLogger(JoinHelper.class); + Loggers.addAppender(joinLogger, mockAppender); + cluster1.runFor(DEFAULT_STABILISATION_TIME, "failing join validation"); + try { + mockAppender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(joinLogger, mockAppender); + mockAppender.stop(); + } + assertTrue(newNode.getLastAppliedClusterState().version() == 0); + + // reset clusterUUIDCommitted (and node / cluster state term) to let node join again + // TODO: use elasticsearch-node detach-cluster tool once it's implemented + final ClusterNode detachedNode = newNode.restartedNode( + metaData -> MetaData.builder(metaData) + .clusterUUIDCommitted(false) + .coordinationMetaData(CoordinationMetaData.builder(metaData.coordinationMetaData()) + .term(0L).build()) + .build(), + term -> 0L); + cluster1.clusterNodes.replaceAll(cn -> cn == newNode ? detachedNode : cn); + cluster1.stabilise(); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -1077,7 +1137,8 @@ class Cluster { final Set masterEligibleNodeIds = new HashSet<>(initialNodeCount); clusterNodes = new ArrayList<>(initialNodeCount); for (int i = 0; i < initialNodeCount; i++) { - final ClusterNode clusterNode = new ClusterNode(i, allNodesMasterEligible || i == 0 || randomBoolean()); + final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), + allNodesMasterEligible || i == 0 || randomBoolean()); clusterNodes.add(clusterNode); if (clusterNode.getLocalNode().isMasterNode()) { masterEligibleNodeIds.add(clusterNode.getId()); @@ -1108,10 +1169,9 @@ List addNodesAndStabilise(int newNodesCount) { List addNodes(int newNodesCount) { logger.info("--> adding {} nodes", newNodesCount); - final int nodeSizeAtStart = clusterNodes.size(); final List addedNodes = new ArrayList<>(); for (int i = 0; i < newNodesCount; i++) { - final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i, true); + final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true); addedNodes.add(clusterNode); } clusterNodes.addAll(addedNodes); @@ -1471,21 +1531,41 @@ class MockPersistedState implements PersistedState { } } - MockPersistedState(DiscoveryNode newLocalNode, MockPersistedState oldState) { + MockPersistedState(DiscoveryNode newLocalNode, MockPersistedState oldState, + Function adaptGlobalMetaData, Function adaptCurrentTerm) { try { if (oldState.nodeEnvironment != null) { nodeEnvironment = oldState.nodeEnvironment; + final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry()); + final MetaData updatedMetaData = adaptGlobalMetaData.apply(oldState.getLastAcceptedState().metaData()); + if (updatedMetaData != oldState.getLastAcceptedState().metaData()) { + metaStateService.writeGlobalStateAndUpdateManifest("update global state", updatedMetaData); + } + final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm()); + if (updatedTerm != oldState.getCurrentTerm()) { + final Manifest manifest = metaStateService.loadManifestOrEmpty(); + metaStateService.writeManifestAndCleanup("update term", + new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(), + manifest.getIndexGenerations())); + } delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode) .getPersistedState(Settings.EMPTY, null); } else { nodeEnvironment = null; BytesStreamOutput outStream = new BytesStreamOutput(); outStream.setVersion(Version.CURRENT); - oldState.getLastAcceptedState().writeTo(outStream); + final MetaData updatedMetaData = adaptGlobalMetaData.apply(oldState.getLastAcceptedState().metaData()); + final ClusterState clusterState; + if (updatedMetaData != oldState.getLastAcceptedState().metaData()) { + clusterState = ClusterState.builder(oldState.getLastAcceptedState()).metaData(updatedMetaData).build(); + } else { + clusterState = oldState.getLastAcceptedState(); + } + clusterState.writeTo(outStream); StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); - delegate = new InMemoryPersistedState(oldState.getCurrentTerm(), ClusterState.readFrom(inStream, - newLocalNode)); // adapts it to new localNode instance + delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()), + ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance } } catch (IOException e) { throw new UncheckedIOException("Unable to create MockPersistedState", e); @@ -1614,12 +1694,17 @@ void close() { } ClusterNode restartedNode() { + return restartedNode(Function.identity(), Function.identity()); + } + + ClusterNode restartedNode(Function adaptGlobalMetaData, Function adaptCurrentTerm) { final TransportAddress address = randomBoolean() ? buildNewFakeTransportAddress() : localNode.getAddress(); final DiscoveryNode newLocalNode = new DiscoveryNode(localNode.getName(), localNode.getId(), UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(), localNode.isMasterNode() ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT); - return new ClusterNode(nodeIndex, newLocalNode, node -> new MockPersistedState(newLocalNode, persistedState)); + return new ClusterNode(nodeIndex, newLocalNode, + node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetaData, adaptCurrentTerm)); } private PersistedState getPersistedState() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index ef843717fb469..4361660876c7a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -43,7 +43,7 @@ public void testJoinDeduplication() { TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> localNode, null, Collections.emptySet()); - JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, + JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList()); transportService.start(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java index 5dcfccaea5874..685b7cca98a94 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java @@ -411,6 +411,43 @@ public void testXContentWithIndexGraveyard() throws IOException { } } + public void testXContentClusterUUID() throws IOException { + final MetaData originalMeta = MetaData.builder().clusterUUID(UUIDs.randomBase64UUID()) + .clusterUUIDCommitted(randomBoolean()).build(); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalMeta.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final MetaData fromXContentMeta = MetaData.fromXContent(parser); + assertThat(fromXContentMeta.clusterUUID(), equalTo(originalMeta.clusterUUID())); + assertThat(fromXContentMeta.clusterUUIDCommitted(), equalTo(originalMeta.clusterUUIDCommitted())); + } + } + + public void testSerializationClusterUUID() throws IOException { + final MetaData originalMeta = MetaData.builder().clusterUUID(UUIDs.randomBase64UUID()) + .clusterUUIDCommitted(randomBoolean()).build(); + final BytesStreamOutput out = new BytesStreamOutput(); + originalMeta.writeTo(out); + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); + final MetaData fromStreamMeta = MetaData.readFrom( + new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry) + ); + assertThat(fromStreamMeta.clusterUUID(), equalTo(originalMeta.clusterUUID())); + assertThat(fromStreamMeta.clusterUUIDCommitted(), equalTo(originalMeta.clusterUUIDCommitted())); + } + + public void testMetaDataGlobalStateChangesOnClusterUUIDChanges() { + final MetaData metaData1 = MetaData.builder().clusterUUID(UUIDs.randomBase64UUID()).clusterUUIDCommitted(randomBoolean()).build(); + final MetaData metaData2 = MetaData.builder(metaData1).clusterUUID(UUIDs.randomBase64UUID()).build(); + final MetaData metaData3 = MetaData.builder(metaData1).clusterUUIDCommitted(!metaData1.clusterUUIDCommitted()).build(); + assertFalse(MetaData.isGlobalStateEquals(metaData1, metaData2)); + assertFalse(MetaData.isGlobalStateEquals(metaData1, metaData3)); + final MetaData metaData4 = MetaData.builder(metaData2).clusterUUID(metaData1.clusterUUID()).build(); + assertTrue(MetaData.isGlobalStateEquals(metaData1, metaData4)); + } + private static CoordinationMetaData.VotingConfiguration randomVotingConfig() { return new CoordinationMetaData.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(randomInt(10), 20, false))); } diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index d94c34c7b33eb..330c73b9c02c5 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.coordination.ClusterBootstrapService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.ShardRouting; @@ -377,6 +378,33 @@ public boolean clearData(String nodeName) { assertTrue(client().prepareGet("index", "_doc", "1").get().isExists()); } + public void testCannotJoinIfMasterLostDataFolder() throws Exception { + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + + internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() { + @Override + public boolean clearData(String nodeName) { + return true; + } + + @Override + public Settings onNodeStopped(String nodeName) { + return Settings.builder().put(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), nodeName).build(); + } + + @Override + public boolean validateClusterForming() { + return false; + } + }); + + assertFalse(internalCluster().client(masterNode).admin().cluster().prepareHealth().get().isTimedOut()); + assertTrue(internalCluster().client(masterNode).admin().cluster().prepareHealth().setWaitForNodes("2").setTimeout("2s").get() + .isTimedOut()); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNode)); // otherwise we will fail during clean-up + } + /** * Tests that indices are properly deleted even if there is a master transition in between. * Test for https://github.com/elastic/elasticsearch/issues/11665 diff --git a/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java b/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java index b34bcf87bdbd8..cae33db90a6bc 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java @@ -263,12 +263,12 @@ public void testMixCurrentAndRecoveredState() { .blocks(ClusterBlocks.builder().addGlobalBlock(CLUSTER_READ_ONLY_BLOCK).build()) .metaData(metaData) .build(); - assertThat(recoveredState.metaData().clusterUUID(), equalTo("_na_")); + assertThat(recoveredState.metaData().clusterUUID(), equalTo(MetaData.UNKNOWN_CLUSTER_UUID)); final ClusterState updatedState = mixCurrentStateAndRecoveredState(currentState, recoveredState); - assertThat(updatedState.metaData().clusterUUID(), not(equalTo("_na_"))); - assertTrue(MetaData.isGlobalStateEquals(metaData, updatedState.metaData())); + assertThat(updatedState.metaData().clusterUUID(), not(equalTo(MetaData.UNKNOWN_CLUSTER_UUID))); + assertFalse(MetaData.isGlobalStateEquals(metaData, updatedState.metaData())); assertThat(updatedState.metaData().index("test"), equalTo(indexMetaData)); assertTrue(updatedState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)); assertTrue(updatedState.blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK)); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 921bcac3d4c65..8ccfa5e406ae2 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -213,22 +213,24 @@ public void testMarkAcceptedConfigAsCommitted() throws IOException { } while (coordinationMetaData.getLastAcceptedConfiguration().equals(coordinationMetaData.getLastCommittedConfiguration())); ClusterState state = createClusterState(randomNonNegativeLong(), - MetaData.builder().coordinationMetaData(coordinationMetaData).build()); + MetaData.builder().coordinationMetaData(coordinationMetaData) + .clusterUUID(randomAlphaOfLength(10)).build()); gateway.setLastAcceptedState(state); gateway = maybeNew(gateway); assertThat(gateway.getLastAcceptedState().getLastAcceptedConfiguration(), not(equalTo(gateway.getLastAcceptedState().getLastCommittedConfiguration()))); - gateway.markLastAcceptedConfigAsCommitted(); + gateway.markLastAcceptedStateAsCommitted(); CoordinationMetaData expectedCoordinationMetaData = CoordinationMetaData.builder(coordinationMetaData) .lastCommittedConfiguration(coordinationMetaData.getLastAcceptedConfiguration()).build(); ClusterState expectedClusterState = - ClusterState.builder(state).metaData(MetaData.builder().coordinationMetaData(expectedCoordinationMetaData).build()).build(); + ClusterState.builder(state).metaData(MetaData.builder().coordinationMetaData(expectedCoordinationMetaData) + .clusterUUID(state.metaData().clusterUUID()).clusterUUIDCommitted(true).build()).build(); gateway = maybeNew(gateway); assertClusterStateEqual(expectedClusterState, gateway.getLastAcceptedState()); - gateway.markLastAcceptedConfigAsCommitted(); + gateway.markLastAcceptedStateAsCommitted(); gateway = maybeNew(gateway); assertClusterStateEqual(expectedClusterState, gateway.getLastAcceptedState());