Skip to content

Commit

Permalink
Revert "Do not renew sync-id if all shards are sealed (#29103)"
Browse files Browse the repository at this point in the history
This reverts commit 25b4d9e.
  • Loading branch information
DaveCTurner committed Mar 20, 2018
1 parent 93370aa commit ec4db0f
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ public Engine.CommitId getRawCommitId() {
return new Engine.CommitId(Base64.getDecoder().decode(id));
}

/**
* The synced-flush id of the commit if existed.
*/
public String syncId() {
return userData.get(InternalEngine.SYNC_COMMIT_ID);
}

/**
* Returns the number of documents in the in this commit
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -67,7 +65,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -219,16 +216,9 @@ public void onResponse(InFlightOpsResponse response) {
if (inflight != 0) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
} else {
// 3. now send the sync request to all the shards;
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
if (sharedSyncId != null) {
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
"Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]";
reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener);
}else {
String syncId = UUIDs.base64UUID();
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
}
// 3. now send the sync request to all the shards
String syncId = UUIDs.base64UUID();
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
}
}

Expand All @@ -254,33 +244,6 @@ public void onFailure(Exception e) {
}
}

private String sharedExistingSyncId(Map<String, PreSyncedFlushResponse> preSyncedFlushResponses) {
String existingSyncId = null;
for (PreSyncedFlushResponse resp : preSyncedFlushResponses.values()) {
if (Strings.isNullOrEmpty(resp.existingSyncId)) {
return null;
}
if (existingSyncId == null) {
existingSyncId = resp.existingSyncId;
}
if (existingSyncId.equals(resp.existingSyncId) == false) {
return null;
}
}
return existingSyncId;
}

private void reportSuccessWithExistingSyncId(ShardId shardId, String existingSyncId, List<ShardRouting> shards, int totalShards,
Map<String, PreSyncedFlushResponse> preSyncResponses, ActionListener<ShardsSyncedFlushResult> listener) {
final Map<ShardRouting, ShardSyncedFlushResponse> results = new HashMap<>();
for (final ShardRouting shard : shards) {
if (preSyncResponses.containsKey(shard.currentNodeId())) {
results.put(shard, new ShardSyncedFlushResponse());
}
}
listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results));
}

final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName());
if (indexRoutingTable == null) {
Expand Down Expand Up @@ -475,7 +438,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
final CommitStats commitStats = indexShard.commitStats();
final Engine.CommitId commitId = commitStats.getRawCommitId();
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs());
}

private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
Expand Down Expand Up @@ -547,19 +510,24 @@ public ShardId shardId() {
static final class PreSyncedFlushResponse extends TransportResponse {
static final int UNKNOWN_NUM_DOCS = -1;
public static final int V_6_2_2_ID = 6020299;
public static final int V_6_3_0_ID = 6030099;

Engine.CommitId commitId;
int numDocs;
@Nullable String existingSyncId = null;

PreSyncedFlushResponse() {
}

PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs, String existingSyncId) {
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) {
this.commitId = commitId;
this.numDocs = numDocs;
this.existingSyncId = existingSyncId;
}

Engine.CommitId commitId() {
return commitId;
}

int numDocs() {
return numDocs;
}

boolean includeNumDocs(Version version) {
Expand All @@ -570,14 +538,6 @@ boolean includeNumDocs(Version version) {
}
}

boolean includeExistingSyncId(Version version) {
if (version.major == Version.V_5_6_9_UNRELEASED.major) {
return version.onOrAfter(Version.V_5_6_9_UNRELEASED);
} else {
return version.id >= V_6_3_0_ID;
}
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -587,9 +547,6 @@ public void readFrom(StreamInput in) throws IOException {
} else {
numDocs = UNKNOWN_NUM_DOCS;
}
if (includeExistingSyncId(in.getVersion())) {
existingSyncId = in.readOptionalString();
}
}

@Override
Expand All @@ -599,9 +556,6 @@ public void writeTo(StreamOutput out) throws IOException {
if (includeNumDocs(out.getVersion())) {
out.writeInt(numDocs);
}
if (includeExistingSyncId(out.getVersion())) {
out.writeOptionalString(existingSyncId);
}
}
}

Expand Down
49 changes: 0 additions & 49 deletions core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.lucene.index.Term;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
Expand All @@ -30,7 +29,6 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -61,7 +59,6 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class FlushIT extends ESIntegTestCase {
Expand Down Expand Up @@ -284,50 +281,4 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
}

public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
assertAcked(
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get()
);
ensureGreen();
final Index index = clusterService().state().metaData().index("test").getIndex();
final ShardId shardId = new ShardId(index, 0);
final int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
// Do not renew synced-flush
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
// Shards were updated, renew synced flush.
final int moreDocs = between(1, 10);
for (int i = 0; i < moreDocs; i++) {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
// Manually remove or change sync-id, renew synced flush.
IndexShard shard = internalCluster().getInstance(IndicesService.class, randomFrom(internalCluster().nodesInclude("test")))
.getShardOrNull(shardId);
if (randomBoolean()) {
// Change the existing sync-id of a single shard.
shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats().getRawCommitId());
assertThat(shard.commitStats().syncId(), not(equalTo(thirdSeal.syncId())));
} else {
// Flush will create a new commit without sync-id
shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true));
assertThat(shard.commitStats().syncId(), nullValue());
}
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}
}

0 comments on commit ec4db0f

Please sign in to comment.