Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] PersistedState interface implementation #35819

Merged
merged 32 commits into from
Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4d41cf3
Add currentTerm and clusterStateVersion to Manifest class
Nov 20, 2018
b365cf3
Zen2 PersistedState implementation
Nov 20, 2018
7b74772
Zen2GatewayMetaState tests
Nov 21, 2018
b024574
Merge branch 'zen2_fresh' into zen2_persisted_state
Nov 21, 2018
9362b22
Add new tests to Zen2GatewayMetaStateTests
Nov 21, 2018
fd0ef74
Add equality check to markLastAcceptedConfigAsCommitted
Nov 21, 2018
f318a57
Remove unused import and fix licences
Nov 21, 2018
444e6c1
Rollback updateMetaData -> updateClusterState in comment
Nov 22, 2018
ac65dec
@SuppressWarnings("unchecked")
Nov 22, 2018
52fac02
writeWithoutComparingVersions -> incrementalWrite
Nov 23, 2018
6de8c9a
Gateway -> Zen2GatewayMetaStateUT
Nov 23, 2018
89e1421
Manifest constants TODO
Nov 23, 2018
5afb5ee
Remove updating previousClusterState in Zen2 gw
Nov 23, 2018
971aef1
Check that markAcceptedConfigAsCommitted does not touch other parts
Nov 23, 2018
a01fd54
Move Zen1 and Zen2 into GatewayMetaState
Nov 23, 2018
b1a6aef
Move GatewayMetaService from GatewayService to discovery
Nov 23, 2018
3525e0b
Merge branch 'zen2_fresh' into zen2_persisted_state
Nov 23, 2018
d358963
Add VotingTombstones to Zen2GatewayMetaStateTests
Nov 23, 2018
f176b3e
Add USE_ZEN2_PERSISTED_STATE setting and test
Nov 26, 2018
5320d2d
Fix code style
Nov 26, 2018
369ef21
Fix isDataOrMasterNode
Nov 26, 2018
2ec6e6a
Remove unused GatewayMetaState from GatewayService
Nov 26, 2018
0caaa28
Remove log from GatewayMetaState
Nov 26, 2018
133d870
Formatting
Nov 26, 2018
2181474
Remove addLowPriorityApplier from ClusterApplier interface
Nov 27, 2018
95a788f
Remove zen2 discovery type
Nov 27, 2018
5bdff70
Restore GatewayMetaState construction order
Nov 27, 2018
d91ad3c
Update comment
Nov 27, 2018
24c43fa
Assertions
Nov 27, 2018
7dbb9af
Zen2GatewayMetaStateTests -> GatewayMetaStatePersistedStateTests
Nov 27, 2018
bf2eb52
Fix typo
Nov 27, 2018
48233d3
Fix codestyle
Nov 27, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,13 @@ public interface PersistedState {
*/
default void markLastAcceptedConfigAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build();
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build());
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build();
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,29 @@
* Index metadata generation could be obtained by calling {@link #getIndexGenerations()}.
*/
public class Manifest implements ToXContentFragment {
private static final long MISSING_GLOBAL_GENERATION = -1;
//TODO revisit missing and unknown constants once Zen2 BWC is ready
private static final long MISSING_GLOBAL_GENERATION = -1L;
private static final long MISSING_CURRENT_TERM = 0L;
andrershov marked this conversation as resolved.
Show resolved Hide resolved
private static final long UNKNOWN_CURRENT_TERM = MISSING_CURRENT_TERM;
private static final long MISSING_CLUSTER_STATE_VERSION = 0L;
private static final long UNKNOWN_CLUSTER_STATE_VERSION = MISSING_CLUSTER_STATE_VERSION;

private final long globalGeneration;
private final Map<Index, Long> indexGenerations;
private final long currentTerm;
private final long clusterStateVersion;

public Manifest(long globalGeneration, Map<Index, Long> indexGenerations) {
public Manifest(long currentTerm, long clusterStateVersion, long globalGeneration, Map<Index, Long> indexGenerations) {
this.currentTerm = currentTerm;
this.clusterStateVersion = clusterStateVersion;
this.globalGeneration = globalGeneration;
this.indexGenerations = indexGenerations;
}

public static Manifest unknownCurrentTermAndVersion(long globalGeneration, Map<Index, Long> indexGenerations) {
return new Manifest(UNKNOWN_CURRENT_TERM, UNKNOWN_CLUSTER_STATE_VERSION, globalGeneration, indexGenerations);
}

/**
* Returns global metadata generation.
*/
Expand All @@ -68,18 +81,38 @@ public Map<Index, Long> getIndexGenerations() {
return indexGenerations;
}

public long getCurrentTerm() {
return currentTerm;
}

public long getClusterStateVersion() {
return clusterStateVersion;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Manifest manifest = (Manifest) o;
return globalGeneration == manifest.globalGeneration &&
Objects.equals(indexGenerations, manifest.indexGenerations);
return currentTerm == manifest.currentTerm &&
clusterStateVersion == manifest.clusterStateVersion &&
globalGeneration == manifest.globalGeneration &&
Objects.equals(indexGenerations, manifest.indexGenerations);
}

@Override
public int hashCode() {
return Objects.hash(globalGeneration, indexGenerations);
return Objects.hash(currentTerm, clusterStateVersion, globalGeneration, indexGenerations);
}

@Override
public String toString() {
return "Manifest{" +
"currentTerm=" + currentTerm +
", clusterStateVersion=" + clusterStateVersion +
", globalGeneration=" + globalGeneration +
", indexGenerations=" + indexGenerations +
'}';
}

private static final String MANIFEST_FILE_PREFIX = "manifest-";
Expand All @@ -103,37 +136,57 @@ public Manifest fromXContent(XContentParser parser) throws IOException {
* Code below this comment is for XContent parsing/generation
*/

private static final ParseField CURRENT_TERM_PARSE_FIELD = new ParseField("current_term");
private static final ParseField CLUSTER_STATE_VERSION_PARSE_FIELD = new ParseField("cluster_state_version");
private static final ParseField GENERATION_PARSE_FIELD = new ParseField("generation");
private static final ParseField INDEX_GENERATIONS_PARSE_FIELD = new ParseField("index_generations");

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(CURRENT_TERM_PARSE_FIELD.getPreferredName(), currentTerm);
builder.field(CLUSTER_STATE_VERSION_PARSE_FIELD.getPreferredName(), clusterStateVersion);
builder.field(GENERATION_PARSE_FIELD.getPreferredName(), globalGeneration);
builder.array(INDEX_GENERATIONS_PARSE_FIELD.getPreferredName(), indexEntryList().toArray());
return builder;
}

private static long requireNonNullElseDefault(Long value, long defaultValue) {
return value != null ? value : defaultValue;
}

private List<IndexEntry> indexEntryList() {
return indexGenerations.entrySet().stream().
map(entry -> new IndexEntry(entry.getKey(), entry.getValue())).
collect(Collectors.toList());
}

private static long generation(Object[] generationAndListOfIndexEntries) {
return (Long) generationAndListOfIndexEntries[0];
private static long currentTerm(Object[] manifestFields) {
return requireNonNullElseDefault((Long) manifestFields[0], MISSING_CURRENT_TERM);
}

private static long clusterStateVersion(Object[] manifestFields) {
return requireNonNullElseDefault((Long) manifestFields[1], MISSING_CLUSTER_STATE_VERSION);
}

private static long generation(Object[] manifestFields) {
return requireNonNullElseDefault((Long) manifestFields[2], MISSING_GLOBAL_GENERATION);
}

private static Map<Index, Long> indices(Object[] generationAndListOfIndexEntries) {
List<IndexEntry> listOfIndices = (List<IndexEntry>) generationAndListOfIndexEntries[1];
@SuppressWarnings("unchecked")
private static Map<Index, Long> indices(Object[] manifestFields) {
List<IndexEntry> listOfIndices = (List<IndexEntry>) manifestFields[3];
return listOfIndices.stream().collect(Collectors.toMap(IndexEntry::getIndex, IndexEntry::getGeneration));
}

private static final ConstructingObjectParser<Manifest, Void> PARSER = new ConstructingObjectParser<>(
"manifest",
generationAndListOfIndexEntries ->
new Manifest(generation(generationAndListOfIndexEntries), indices(generationAndListOfIndexEntries)));
manifestFields ->
new Manifest(currentTerm(manifestFields), clusterStateVersion(manifestFields), generation(manifestFields),
indices(manifestFields)));

static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), CURRENT_TERM_PARSE_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_STATE_VERSION_PARSE_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_PARSE_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), IndexEntry.INDEX_ENTRY_PARSER, INDEX_GENERATIONS_PARSE_FIELD);
}
Expand All @@ -143,11 +196,12 @@ public static Manifest fromXContent(XContentParser parser) throws IOException {
}

public boolean isEmpty() {
return globalGeneration == MISSING_GLOBAL_GENERATION && indexGenerations.isEmpty();
return currentTerm == MISSING_CURRENT_TERM && clusterStateVersion == MISSING_CLUSTER_STATE_VERSION
&& globalGeneration == MISSING_GLOBAL_GENERATION && indexGenerations.isEmpty();
}

public static Manifest empty() {
return new Manifest(MISSING_GLOBAL_GENERATION, Collections.emptyMap());
return new Manifest(MISSING_CURRENT_TERM, MISSING_CLUSTER_STATE_VERSION, MISSING_GLOBAL_GENERATION, Collections.emptyMap());
}

public boolean isGlobalGenerationMissing() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -73,7 +74,7 @@ public class DiscoveryModule {
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService, Path configFile) {
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) {
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
Expand Down Expand Up @@ -118,15 +119,16 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators)));
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators), gatewayMetaState));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
masterService, clusterApplier, clusterSettings, hostsProvider, allocationService).entrySet().forEach(entry -> {
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
}
});
masterService, clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState).entrySet()
.forEach(entry -> {
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
}
});
}
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
Supplier<Discovery> discoverySupplier = discoveryTypes.get(discoveryType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand All @@ -58,6 +59,7 @@
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
Expand Down Expand Up @@ -159,7 +161,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService,
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, GatewayMetaState gatewayMetaState) {
super(settings);
this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators);
this.masterService = masterService;
Expand Down Expand Up @@ -227,6 +229,10 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t

transportService.registerRequestHandler(
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());

if (clusterApplier instanceof ClusterApplierService) {
((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState);
}
}

static Collection<BiConsumer<DiscoveryNode,ClusterState>> addBuiltInJoinValidators(
Expand Down
Loading