Skip to content

Commit

Permalink
Ensure changes requests return the latest mapping version (#37633)
Browse files Browse the repository at this point in the history
Today we keep the mapping on the follower in sync with the leader's
using the mapping version from changes requests. There are two rare
cases where the mapping on the follower is not synced properly:

1. The returned mapping version (from ClusterService) is outdated than
the actual mapping. This happens because we expose the latest cluster
state in ClusterService after applying it to IndexService.

2. It's possible for the FollowTask to receive an outdated mapping than
the min_required_mapping. In that case, it should fetch the mapping
again; otherwise, the follower won't have the right mapping.

Relates to #31140
  • Loading branch information
dnhatn authored Jan 23, 2019
1 parent d7fe4e5 commit 0096f1b
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
ThreadPool threadPool,
Client client,
SettingsModule settingsModule) {
IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings();
return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings));
return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, settingsModule));
}

public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ public final class CcrSettings {
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using.
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator and shard follow task should be using.
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);
public static final Setting<TimeValue> CCR_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);

/**
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using.
* TODO: Deprecate and remove this setting
*/
private static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow.wait_for_metadata_timeout", CCR_WAIT_FOR_METADATA_TIMEOUT, Property.NodeScope, Property.Dynamic);

/**
* Max bytes a node can recover per second.
Expand Down Expand Up @@ -62,7 +68,8 @@ static List<Setting<?>> getSettings() {
CCR_FOLLOWING_INDEX_SETTING,
RECOVERY_MAX_BYTES_PER_SECOND,
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
CCR_WAIT_FOR_METADATA_TIMEOUT);
}

private final CombinedRateLimiter ccrRateLimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ protected boolean removeEldestEntry(final Map.Entry<String, Tuple<Long, Elastics
waitForMetadataTimeOut = newWaitForTimeOut;
}
};
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, updater);
waitForMetadataTimeOut = CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT, updater);
waitForMetadataTimeOut = CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.get(settings);
}

public synchronized AutoFollowStats getStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,21 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();

final Translog.Operation[] operations = getOperations(
indexShard,
seqNoStats.getGlobalCheckpoint(),
request.getFromSeqNo(),
request.getMaxOperationCount(),
request.getExpectedHistoryUUID(),
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
// must capture after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
// must capture IndexMetaData after snapshotting operations to ensure the returned mapping version is at least as up-to-date
// as the mapping version that these operations used. Here we must not use IndexMetaData from ClusterService for we expose
// a new cluster state to ClusterApplier(s) before exposing it in the ClusterService.
final IndexMetaData indexMetaData = indexService.getMetaData();
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
return getResponse(
mappingVersion,
settingsVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void start(
}

// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
updateMapping(followerMappingVersion -> {
updateMapping(0L, followerMappingVersion -> {
synchronized (ShardFollowNodeTask.this) {
currentMappingVersion = followerMappingVersion;
}
Expand Down Expand Up @@ -370,15 +370,15 @@ private synchronized void handleWriteResponse(final BulkShardOperationsResponse
coordinateReads();
}

private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) {
private synchronized void maybeUpdateMapping(long minimumRequiredMappingVersion, Runnable task) {
if (currentMappingVersion >= minimumRequiredMappingVersion) {
LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
task.run();
} else {
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
updateMapping(mappingVersion -> {
updateMapping(minimumRequiredMappingVersion, mappingVersion -> {
currentMappingVersion = mappingVersion;
task.run();
});
Expand All @@ -400,12 +400,13 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings
}
}

private void updateMapping(LongConsumer handler) {
updateMapping(handler, new AtomicInteger(0));
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler) {
updateMapping(minRequiredMappingVersion, handler, new AtomicInteger(0));
}

private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter)));
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(minRequiredMappingVersion, handler,
e -> handleFailure(e, retryCounter, () -> updateMapping(minRequiredMappingVersion, handler, retryCounter)));
}

private void updateSettings(final LongConsumer handler) {
Expand Down Expand Up @@ -471,7 +472,7 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
}

// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
protected abstract void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
Expand All @@ -46,6 +48,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
Expand All @@ -69,16 +72,20 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final IndexScopedSettings indexScopedSettings;
private volatile TimeValue waitForMetadataTimeOut;

public ShardFollowTasksExecutor(Client client,
ThreadPool threadPool,
ClusterService clusterService,
IndexScopedSettings indexScopedSettings) {
SettingsModule settingsModule) {
super(ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indexScopedSettings = indexScopedSettings;
this.indexScopedSettings = settingsModule.getIndexScopedSettings();
this.waitForMetadataTimeOut = CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.get(settingsModule.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT,
newVal -> this.waitForMetadataTimeOut = newVal);
}

@Override
Expand Down Expand Up @@ -112,33 +119,25 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
scheduler, System::nanoTime) {

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
handler.accept(indexMetaData.getMappingVersion());
return;
}

assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" +
indexMetaData.getMappings().size() + "]";
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;

PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData);
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
};
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
final Index followerIndex = params.getFollowShardId().getIndex();
getIndexMetadata(minRequiredMappingVersion, 0L, params, ActionListener.wrap(
indexMetaData -> {
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
handler.accept(indexMetaData.getMappingVersion());
return;
}
assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" +
indexMetaData.getMappings().size() + "]";
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
},
errorHandler
));
}

@Override
Expand Down Expand Up @@ -257,6 +256,39 @@ private Client remoteClient(ShardFollowTask params) {
return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
}

private void getIndexMetadata(long minRequiredMappingVersion, long minRequiredMetadataVersion,
ShardFollowTask params, ActionListener<IndexMetaData> listener) {
final Index leaderIndex = params.getLeaderShardId().getIndex();
final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
if (minRequiredMetadataVersion > 0) {
clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion).waitForTimeout(waitForMetadataTimeOut);
}
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(
r -> {
// if wait_for_metadata_version timeout, the response is empty
if (r.getState() == null) {
assert minRequiredMetadataVersion > 0;
getIndexMetadata(minRequiredMappingVersion, minRequiredMetadataVersion, params, listener);
return;
}
final MetaData metaData = r.getState().metaData();
final IndexMetaData indexMetaData = metaData.getIndexSafe(leaderIndex);
if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) {
// ask for the next version.
getIndexMetadata(minRequiredMappingVersion, metaData.version() + 1, params, listener);
} else {
assert metaData.version() >= minRequiredMetadataVersion : metaData.version() + " < " + minRequiredMetadataVersion;
listener.onResponse(indexMetaData);
}
},
listener::onFailure
));
} catch (Exception e) {
listener.onFailure(e);
}
}

interface FollowerStatsInfoHandler {
void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected Settings nodeSettings() {
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
return builder.build();
}

Expand Down
Loading

0 comments on commit 0096f1b

Please sign in to comment.