Skip to content

Commit

Permalink
return metadata_version
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 21, 2019
1 parent ba0df59 commit 4c26292
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final List<IndexEventListener> buildInIndexListener;
private final PrimaryReplicaSyncer primaryReplicaSyncer;
private final Consumer<ShardId> globalCheckpointSyncer;
private volatile ClusterState clusterState; // the latest applied cluster state

@Inject
public IndicesClusterStateService(
Expand Down Expand Up @@ -229,6 +230,7 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {
return;
}

this.clusterState = state;
updateFailedShardsCache(state);

deleteIndices(event); // also deletes shards of deleted indices
Expand All @@ -246,6 +248,13 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {
createOrUpdateShards(state);
}

/**
* Returns the latest applied cluster state
*/
public ClusterState getClusterState() {
return clusterState;
}

/**
* Removes shard entries from the failed shards cache that are no longer allocated to this node by the master.
* Sends shard failures for shards that are marked as actively allocated to this node but don't actually exist on the node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -18,6 +19,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -34,6 +36,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -212,6 +215,12 @@ public long getSettingsVersion() {
return settingsVersion;
}

private long metadataVersion;

public long getMetadataVersion() {
return metadataVersion;
}

private long globalCheckpoint;

public long getGlobalCheckpoint() {
Expand Down Expand Up @@ -248,6 +257,7 @@ public long getTookInMillis() {
Response(
final long mappingVersion,
final long settingsVersion,
final long metadataVersion,
final long globalCheckpoint,
final long maxSeqNo,
final long maxSeqNoOfUpdatesOrDeletes,
Expand All @@ -256,6 +266,7 @@ public long getTookInMillis() {

this.mappingVersion = mappingVersion;
this.settingsVersion = settingsVersion;
this.metadataVersion = metadataVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
Expand All @@ -268,6 +279,11 @@ public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
mappingVersion = in.readVLong();
settingsVersion = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
metadataVersion = in.readVLong();
} else {
metadataVersion = 0L;
}
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
Expand All @@ -280,6 +296,9 @@ public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(mappingVersion);
out.writeVLong(settingsVersion);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeVLong(metadataVersion);
}
out.writeZLong(globalCheckpoint);
out.writeZLong(maxSeqNo);
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
Expand All @@ -294,6 +313,7 @@ public boolean equals(final Object o) {
final Response that = (Response) o;
return mappingVersion == that.mappingVersion &&
settingsVersion == that.settingsVersion &&
metadataVersion == that.metadataVersion &&
globalCheckpoint == that.globalCheckpoint &&
maxSeqNo == that.maxSeqNo &&
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
Expand All @@ -306,6 +326,7 @@ public int hashCode() {
return Objects.hash(
mappingVersion,
settingsVersion,
metadataVersion,
globalCheckpoint,
maxSeqNo,
maxSeqNoOfUpdatesOrDeletes,
Expand All @@ -317,17 +338,20 @@ public int hashCode() {
public static class TransportAction extends TransportSingleShardAction<Request, Response> {

private final IndicesService indicesService;
private final IndicesClusterStateService indicesClusterStateService;

@Inject
public TransportAction(ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService) {
IndicesService indicesService,
IndicesClusterStateService indicesClusterStateService) {
super(NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, ThreadPool.Names.SEARCH);
this.indicesService = indicesService;
this.indicesClusterStateService = indicesClusterStateService;
}

@Override
Expand All @@ -347,12 +371,14 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
// must capture IndexMetaData after snapshotting operations to ensure the returned mapping version is at least as up-to-date
// as the mapping version that these operations used. Here we must not use IndexMetaData from ClusterService for we expose
// a new cluster state to ClusterApplier(s) before exposing it in the ClusterService.
final IndexMetaData indexMetaData = indexService.getMetaData();
final MetaData metaData = indicesClusterStateService.getClusterState().metaData();
final IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
return getResponse(
mappingVersion,
settingsVersion,
metaData.version(),
seqNoStats,
maxSeqNoOfUpdatesOrDeletes,
operations,
Expand Down Expand Up @@ -432,7 +458,8 @@ private void globalCheckpointAdvancementFailure(
e);
if (e instanceof TimeoutException) {
try {
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final MetaData metaData = indicesClusterStateService.getClusterState().metaData();
final IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
Expand All @@ -441,6 +468,7 @@ private void globalCheckpointAdvancementFailure(
getResponse(
mappingVersion,
settingsVersion,
metaData.version(),
latestSeqNoStats,
maxSeqNoOfUpdatesOrDeletes,
EMPTY_OPERATIONS_ARRAY,
Expand Down Expand Up @@ -532,6 +560,7 @@ static Translog.Operation[] getOperations(
static Response getResponse(
final long mappingVersion,
final long settingsVersion,
final long metadataVersion,
final SeqNoStats seqNoStats,
final long maxSeqNoOfUpdates,
final Translog.Operation[] operations,
Expand All @@ -541,6 +570,7 @@ static Response getResponse(
return new Response(
mappingVersion,
settingsVersion,
metadataVersion,
seqNoStats.getGlobalCheckpoint(),
seqNoStats.getMaxSeqNo(),
maxSeqNoOfUpdates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
protected ShardChangesAction.Response createTestInstance() {
final long mappingVersion = randomNonNegativeLong();
final long settingsVersion = randomNonNegativeLong();
final long metadataVersion = randomNonNegativeLong();
final long leaderGlobalCheckpoint = randomNonNegativeLong();
final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE);
final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(-1, Long.MAX_VALUE);
Expand All @@ -25,6 +26,7 @@ protected ShardChangesAction.Response createTestInstance() {
return new ShardChangesAction.Response(
mappingVersion,
settingsVersion,
metadataVersion,
leaderGlobalCheckpoint,
leaderMaxSeqNo,
maxSeqNoOfUpdatesOrDeletes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
assert from >= testRun.finalExpectedGlobalCheckpoint;
final long globalCheckpoint = tracker.getCheckpoint();
final long maxSeqNo = tracker.getMaxSeqNo();
handler.accept(new ShardChangesAction.Response(0L, 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(),
handler.accept(new ShardChangesAction.Response(0L, 0L, 1L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(),
new Translog.Operation[0], 1L));
}
};
Expand Down Expand Up @@ -249,6 +249,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
new ShardChangesAction.Response(
mappingVersion,
settingsVersion,
1L,
nextGlobalCheckPoint,
nextGlobalCheckPoint,
randomNonNegativeLong(),
Expand All @@ -274,6 +275,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
ShardChangesAction.Response response = new ShardChangesAction.Response(
mappingVersion,
settingsVersion,
1L,
prevGlobalCheckpoint,
prevGlobalCheckpoint,
randomNonNegativeLong(),
Expand All @@ -293,6 +295,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
ShardChangesAction.Response response = new ShardChangesAction.Response(
mappingVersion,
settingsVersion,
1L,
localLeaderGCP,
localLeaderGCP,
randomNonNegativeLong(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ public void testReceiveNothingExpectedSomething() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

shardChangesRequests.clear();
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 100, new Translog.Operation[0], 1L));
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 1L, 0, 0, 100, new Translog.Operation[0], 1L));

assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
Expand Down Expand Up @@ -1022,6 +1022,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con
final ShardChangesAction.Response response = new ShardChangesAction.Response(
mappingVersions.poll(),
0L,
1L,
leaderGlobalCheckpoints.poll(),
maxSeqNos.poll(),
randomNonNegativeLong(),
Expand Down Expand Up @@ -1058,6 +1059,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro
return new ShardChangesAction.Response(
mappingVersion,
settingsVersion,
1L,
leaderGlobalCheckPoint,
leaderGlobalCheckPoint,
randomNonNegativeLong(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,15 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
if (from > seqNoStats.getGlobalCheckpoint()) {
handler.accept(ShardChangesAction.getResponse(1L, 1L, seqNoStats,
handler.accept(ShardChangesAction.getResponse(1L, 1L, 1L, seqNoStats,
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L));
return;
}
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxReadRequestSize());
// hard code mapping version; this is ok, as mapping updates are not tested here
final ShardChangesAction.Response response = new ShardChangesAction.Response(
1L,
1L,
1L,
seqNoStats.getGlobalCheckpoint(),
Expand Down

0 comments on commit 4c26292

Please sign in to comment.