Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce cluster UUIDs #37775

Merged
merged 20 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
logger.trace("handleCommit: applying commit request for term [{}] and version [{}]", applyCommit.getTerm(),
applyCommit.getVersion());

persistedState.markLastAcceptedConfigAsCommitted();
persistedState.markLastAcceptedStateAsCommitted();
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -471,16 +471,28 @@ public interface PersistedState {
/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
* with the last committed configuration now corresponding to the last accepted configuration.
* with the last committed configuration now corresponding to the last accepted configuration, and the cluster uuid, if set,
* marked as committed.
*/
default void markLastAcceptedConfigAsCommitted() {
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
MetaData.Builder metaDataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build();
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build());
metaDataBuilder = MetaData.builder(lastAcceptedState.metaData());
metaDataBuilder.coordinationMetaData(coordinationMetaData);
}
if (lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we commit a state without a cluster UUID? Feels like this should be an assertion to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the master node is a Zen1 node that has not recovered its state yet, that can unfortunately be the case (testMixedClusterFormation found this). I've added an assertion to that effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write the assertion in a way that means we will have to remove it when Zen1 is no more? E.g. mention ZEN1_BWC_TERM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 2697b44

lastAcceptedState.metaData().clusterUUIDCommitted() == false) {
if (metaDataBuilder == null) {
metaDataBuilder = MetaData.builder(lastAcceptedState.metaData());
}
metaDataBuilder.clusterUUIDCommitted(true);
}
if (metaDataBuilder != null) {
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaDataBuilder).build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.masterService = masterService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
this.persistedStateSupplier = persistedStateSupplier;
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
Expand Down Expand Up @@ -279,7 +279,18 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
+ lastKnownLeader + ", rejecting");
}

if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) {
final ClusterState localState = coordinationState.get().getLastAcceptedState();

if (localState.metaData().clusterUUIDCommitted() &&
localState.metaData().clusterUUID().equals(publishRequest.getAcceptedState().metaData().clusterUUID()) == false) {
logger.warn("received cluster state from {} with a different cluster uuid {} than local cluster uuid {}, rejecting",
sourceNode, publishRequest.getAcceptedState().metaData().clusterUUID(), localState.metaData().clusterUUID());
throw new CoordinationStateRejectedException("received cluster state from " + sourceNode +
" with a different cluster uuid " + publishRequest.getAcceptedState().metaData().clusterUUID() +
" than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
}

if (publishRequest.getAcceptedState().term() > localState.term()) {
// only do join validation if we have not accepted state from this master yet
onJoinValidators.forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState()));
}
Expand Down Expand Up @@ -621,6 +632,7 @@ public void invariant() {
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
: preVoteCollector + " vs " + getPreVoteResponse();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class JoinHelper {

Expand All @@ -84,7 +85,7 @@ public class JoinHelper {
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();

public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
this.masterService = masterService;
Expand Down Expand Up @@ -132,6 +133,13 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME,
MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
(request, channel, task) -> {
final ClusterState localState = currentStateSupplier.get();
if (localState.metaData().clusterUUIDCommitted() &&
localState.metaData().clusterUUID().equals(request.getState().metaData().clusterUUID()) == false) {
throw new CoordinationStateRejectedException("join validation on cluster state" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason why we log warning message in Coordinator, but do not log it here?

" with a different cluster uuid " + request.getState().metaData().clusterUUID() +
" than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
}
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
channel.sendResponse(Empty.INSTANCE);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
private static final Logger logger = LogManager.getLogger(MetaData.class);

public static final String ALL = "_all";
public static final String UNKNOWN_CLUSTER_UUID = "_na_";

public enum XContentContext {
/* Custom metadata should be returns as part of API call */
Expand Down Expand Up @@ -159,6 +160,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);

private final String clusterUUID;
private final boolean clusterUUIDCommitted;
private final long version;

private final CoordinationMetaData coordinationMetaData;
Expand All @@ -179,12 +181,13 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust

private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;

MetaData(String clusterUUID, long version, CoordinationMetaData coordinationMetaData,
MetaData(String clusterUUID, boolean clusterUUIDCommitted, long version, CoordinationMetaData coordinationMetaData,
Settings transientSettings, Settings persistentSettings,
ImmutableOpenMap<String, IndexMetaData> indices, ImmutableOpenMap<String, IndexTemplateMetaData> templates,
ImmutableOpenMap<String, Custom> customs, String[] allIndices, String[] allOpenIndices, String[] allClosedIndices,
SortedMap<String, AliasOrIndex> aliasAndIndexLookup) {
this.clusterUUID = clusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
this.version = version;
this.coordinationMetaData = coordinationMetaData;
this.transientSettings = transientSettings;
Expand Down Expand Up @@ -218,6 +221,14 @@ public String clusterUUID() {
return this.clusterUUID;
}

/**
* Whether the current node with the given cluster state is locked into the cluster with the UUID returned by {@link #clusterUUID()},
* meaning that it will not accept any cluster state with a different clusterUUID.
*/
public boolean clusterUUIDCommitted() {
return this.clusterUUIDCommitted;
}

/**
* Returns the merged transient and persistent settings.
*/
Expand Down Expand Up @@ -757,6 +768,12 @@ public static boolean isGlobalStateEquals(MetaData metaData1, MetaData metaData2
if (!metaData1.templates.equals(metaData2.templates())) {
return false;
}
if (!metaData1.clusterUUID.equals(metaData2.clusterUUID)) {
return false;
}
if (metaData1.clusterUUIDCommitted != metaData2.clusterUUIDCommitted) {
return false;
}
// Check if any persistent metadata needs to be saved
int customCount1 = 0;
for (ObjectObjectCursor<String, Custom> cursor : metaData1.customs) {
Expand Down Expand Up @@ -798,6 +815,7 @@ private static class MetaDataDiff implements Diff<MetaData> {

private long version;
private String clusterUUID;
private boolean clusterUUIDCommitted;
private CoordinationMetaData coordinationMetaData;
private Settings transientSettings;
private Settings persistentSettings;
Expand All @@ -807,6 +825,7 @@ private static class MetaDataDiff implements Diff<MetaData> {

MetaDataDiff(MetaData before, MetaData after) {
clusterUUID = after.clusterUUID;
clusterUUIDCommitted = after.clusterUUIDCommitted;
version = after.version;
coordinationMetaData = after.coordinationMetaData;
transientSettings = after.transientSettings;
Expand All @@ -818,8 +837,11 @@ private static class MetaDataDiff implements Diff<MetaData> {

MetaDataDiff(StreamInput in) throws IOException {
clusterUUID = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
clusterUUIDCommitted = in.readBoolean();
}
version = in.readLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) { //TODO revisit after Zen2 BWC is implemented
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
coordinationMetaData = new CoordinationMetaData(in);
} else {
coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
Expand All @@ -836,6 +858,9 @@ private static class MetaDataDiff implements Diff<MetaData> {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(clusterUUID);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(clusterUUIDCommitted);
}
out.writeLong(version);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
coordinationMetaData.writeTo(out);
Expand All @@ -851,6 +876,7 @@ public void writeTo(StreamOutput out) throws IOException {
public MetaData apply(MetaData part) {
Builder builder = builder();
builder.clusterUUID(clusterUUID);
builder.clusterUUIDCommitted(clusterUUIDCommitted);
builder.version(version);
builder.coordinationMetaData(coordinationMetaData);
builder.transientSettings(transientSettings);
Expand All @@ -866,6 +892,9 @@ public static MetaData readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
builder.version = in.readLong();
builder.clusterUUID = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
builder.clusterUUIDCommitted = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
builder.coordinationMetaData(new CoordinationMetaData(in));
}
Expand All @@ -891,6 +920,9 @@ public static MetaData readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeString(clusterUUID);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(clusterUUIDCommitted);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
coordinationMetaData.writeTo(out);
}
Expand Down Expand Up @@ -930,6 +962,7 @@ public static Builder builder(MetaData metaData) {
public static class Builder {

private String clusterUUID;
private boolean clusterUUIDCommitted;
private long version;

private CoordinationMetaData coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
Expand All @@ -941,7 +974,7 @@ public static class Builder {
private final ImmutableOpenMap.Builder<String, Custom> customs;

public Builder() {
clusterUUID = "_na_";
clusterUUID = UNKNOWN_CLUSTER_UUID;
indices = ImmutableOpenMap.builder();
templates = ImmutableOpenMap.builder();
customs = ImmutableOpenMap.builder();
Expand All @@ -950,6 +983,7 @@ public Builder() {

public Builder(MetaData metaData) {
this.clusterUUID = metaData.clusterUUID;
this.clusterUUIDCommitted = metaData.clusterUUIDCommitted;
this.coordinationMetaData = metaData.coordinationMetaData;
this.transientSettings = metaData.transientSettings;
this.persistentSettings = metaData.persistentSettings;
Expand Down Expand Up @@ -1125,8 +1159,13 @@ public Builder clusterUUID(String clusterUUID) {
return this;
}

public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
this.clusterUUIDCommitted = clusterUUIDCommitted;
return this;
}

public Builder generateClusterUuidIfNeeded() {
if (clusterUUID.equals("_na_")) {
if (clusterUUID.equals(UNKNOWN_CLUSTER_UUID)) {
clusterUUID = UUIDs.randomBase64UUID();
}
return this;
Expand Down Expand Up @@ -1182,8 +1221,9 @@ public MetaData build() {
String[] allOpenIndicesArray = allOpenIndices.toArray(new String[allOpenIndices.size()]);
String[] allClosedIndicesArray = allClosedIndices.toArray(new String[allClosedIndices.size()]);

return new MetaData(clusterUUID, version, coordinationMetaData, transientSettings, persistentSettings, indices.build(),
templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray, aliasAndIndexLookup);
return new MetaData(clusterUUID, clusterUUIDCommitted, version, coordinationMetaData, transientSettings, persistentSettings,
indices.build(), templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray,
aliasAndIndexLookup);
}

private SortedMap<String, AliasOrIndex> buildAliasAndIndexLookup() {
Expand Down Expand Up @@ -1226,6 +1266,7 @@ public static void toXContent(MetaData metaData, XContentBuilder builder, ToXCon

builder.field("version", metaData.version());
builder.field("cluster_uuid", metaData.clusterUUID);
builder.field("cluster_uuid_committed", metaData.clusterUUIDCommitted);

builder.startObject("cluster_coordination");
metaData.coordinationMetaData().toXContent(builder, params);
Expand Down Expand Up @@ -1324,6 +1365,8 @@ public static MetaData fromXContent(XContentParser parser) throws IOException {
builder.version = parser.longValue();
} else if ("cluster_uuid".equals(currentFieldName) || "uuid".equals(currentFieldName)) {
builder.clusterUUID = parser.text();
} else if ("cluster_uuid_committed".equals(currentFieldName)) {
builder.clusterUUIDCommitted = parser.booleanValue();
} else {
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
Expand Down
Loading