Skip to content

Commit

Permalink
Ensure changes requests return latest mapping version
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 19, 2019
1 parent 9b32f57 commit ba0df59
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,21 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();

final Translog.Operation[] operations = getOperations(
indexShard,
seqNoStats.getGlobalCheckpoint(),
request.getFromSeqNo(),
request.getMaxOperationCount(),
request.getExpectedHistoryUUID(),
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
// must capture after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
// must capture IndexMetaData after snapshotting operations to ensure the returned mapping version is at least as up-to-date
// as the mapping version that these operations used. Here we must not use IndexMetaData from ClusterService for we expose
// a new cluster state to ClusterApplier(s) before exposing it in the ClusterService.
final IndexMetaData indexMetaData = indexService.getMetaData();
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
return getResponse(
mappingVersion,
settingsVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void start(
}

// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
updateMapping(followerMappingVersion -> {
updateMapping(0L, followerMappingVersion -> {
synchronized (ShardFollowNodeTask.this) {
currentMappingVersion = followerMappingVersion;
}
Expand Down Expand Up @@ -378,7 +378,7 @@ private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion,
} else {
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
updateMapping(mappingVersion -> {
updateMapping(minimumRequiredMappingVersion, mappingVersion -> {
currentMappingVersion = mappingVersion;
task.run();
});
Expand All @@ -400,12 +400,13 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings
}
}

private void updateMapping(LongConsumer handler) {
updateMapping(handler, new AtomicInteger(0));
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler) {
updateMapping(minRequiredMappingVersion, handler, new AtomicInteger(0));
}

private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter)));
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(minRequiredMappingVersion, handler,
e -> handleFailure(e, retryCounter, () -> updateMapping(minRequiredMappingVersion, handler, retryCounter)));
}

private void updateSettings(final LongConsumer handler) {
Expand Down Expand Up @@ -468,7 +469,7 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
}

// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
protected abstract void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,18 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
scheduler, System::nanoTime) {

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
// the returned mapping is outdated - retry again
if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) {
innerUpdateMapping(minRequiredMappingVersion, handler, errorHandler);
return;
}
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
handler.accept(indexMetaData.getMappingVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,42 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.hamcrest.Matchers;

import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class FollowerFailOverIT extends CcrIntegTestCase {

Expand Down Expand Up @@ -220,4 +230,54 @@ public void testAddNewReplicasOnFollower() throws Exception {
pauseFollow("follower-index");
}

public void testReadRequestsReturnsLatestMappingVersion() throws Exception {
InternalTestCluster leaderCluster = getLeaderCluster();
Settings nodeAttributes = Settings.builder().put("node.attr.box", "large").build();
String dataNode = leaderCluster.startDataOnlyNode(nodeAttributes);
assertAcked(
leaderClient().admin().indices().prepareCreate("leader-index")
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")
.put("index.routing.allocation.require.box", "large"))
.get()
);
ClusterService clusterService = leaderCluster.clusterService(dataNode);
ShardId shardId = clusterService.state().routingTable().index("leader-index").shard(0).shardId();
IndicesService indicesService = leaderCluster.getInstance(IndicesService.class, dataNode);
IndexShard indexShard = indicesService.getShardOrNull(shardId);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.addLowPriorityApplier(event -> {
IndexMetaData imd = event.state().metaData().index("leader-index");
if (imd != null && imd.mapping() != null &&
XContentMapValues.extractValue("properties.balance.type", imd.mapping().sourceAsMap()) != null) {
try {
logger.info("--> block ClusterService from exposing new mapping version");
latch.await();
} catch (Exception e) {
throw new AssertionError(e);
}
}
});
leaderCluster.client().admin().indices().preparePutMapping().setType("doc")
.setSource("balance", "type=long").setTimeout(TimeValue.ZERO).get();
IndexResponse indexResp = leaderCluster.client(dataNode).prepareIndex("leader-index", "doc", "1")
.setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get();
assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L));
getFollowerCluster().startDataOnlyNode(nodeAttributes);
followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get();
ensureFollowerGreen("follower-index");
assertBusy(() -> {
CcrClient ccrClient = new CcrClient(followerClient());
FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet();
long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum();
assertThat(bytesRead, Matchers.greaterThan(0L));
});
latch.countDown();
assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
pauseFollow("follower-index");
ensureNoCcrTasks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testR
private final Map<Long, Integer> fromToSlot = new HashMap<>();

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
handler.accept(mappingVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ private ShardFollowNodeTask createShardFollowTask(ShardFollowTaskParams params)
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) {

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
Exception failure = mappingUpdateFailures.poll();
if (failure != null) {
errorHandler.accept(failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ protected synchronized void onOperationsFetched(Translog.Operation[] operations)
}

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
// noop, as mapping updates are not tested
handler.accept(1L);
}
Expand Down

0 comments on commit ba0df59

Please sign in to comment.