Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for ccr recovery action #37840

Merged
merged 14 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ public final class CcrSettings {
Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* The timeout value to use for requests made as part of ccr recovery process.
* */
public static final Setting<TimeValue> INDICES_RECOVERY_ACTION_TIMEOUT_SETTING =
Setting.positiveTimeSetting("ccr.indices.recovery.internal_action_timeout", TimeValue.timeValueSeconds(60),
Property.Dynamic, Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -67,19 +74,23 @@ static List<Setting<?>> getSettings() {
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
RECOVERY_MAX_BYTES_PER_SECOND,
INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
CCR_WAIT_FOR_METADATA_TIMEOUT);
}

private final CombinedRateLimiter ccrRateLimiter;
private volatile TimeValue recoveryActivityTimeout;
private volatile TimeValue recoveryActionTimeout;

public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
Expand All @@ -90,11 +101,19 @@ private void setRecoveryActivityTimeout(TimeValue recoveryActivityTimeout) {
this.recoveryActivityTimeout = recoveryActivityTimeout;
}

private void setRecoveryActionTimeout(TimeValue recoveryActionTimeout) {
this.recoveryActionTimeout = recoveryActionTimeout;
}

public CombinedRateLimiter getRateLimiter() {
return ccrRateLimiter;
}

public TimeValue getRecoveryActivityTimeout() {
return recoveryActivityTimeout;
}

public TimeValue getRecoveryActionTimeout() {
return recoveryActionTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public RepositoryMetaData getMetadata() {
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true).get();
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true)
.get(ccrSettings.getRecoveryActionTimeout());
ImmutableOpenMap<String, IndexMetaData> indicesMap = response.getState().metaData().indices();
ArrayList<String> indices = new ArrayList<>(indicesMap.size());
indicesMap.keysIt().forEachRemaining(indices::add);
Expand All @@ -138,7 +139,8 @@ public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
// We set a single dummy index name to avoid fetching all the index data
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
.actionGet(ccrSettings.getRecoveryActionTimeout());
return clusterState.getState().metaData();
}

Expand All @@ -149,13 +151,14 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);

ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
.actionGet(ccrSettings.getRecoveryActionTimeout());

// Validates whether the leader cluster has been configured properly:
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
String[] leaderHistoryUUIDs = future.actionGet();
String[] leaderHistoryUUIDs = future.actionGet(ccrSettings.getRecoveryActionTimeout());

IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndexMetaData);
// Adding the leader index uuid for each shard as custom metadata:
Expand All @@ -172,7 +175,8 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
@Override
public RepositoryData getRepositoryData() {
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).get();
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
.get(ccrSettings.getRecoveryActionTimeout());
MetaData remoteMetaData = response.getState().getMetaData();

Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
Expand Down Expand Up @@ -282,25 +286,26 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve

private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
.actionGet(ccrSettings.getRecoveryActionTimeout());
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();

if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
Index followerIndex = followerIndexSettings.getIndex();
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
}
}

private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RecoveryState recoveryState) {
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc);
response.getStoreFileMetaData(), ccrSettings, throttledTime::inc);
}

private static class RestoreSession extends FileRestoreContext implements Closeable {
Expand All @@ -311,18 +316,18 @@ private static class RestoreSession extends FileRestoreContext implements Closea
private final String sessionUUID;
private final DiscoveryNode node;
private final Store.MetadataSnapshot sourceMetaData;
private final CombinedRateLimiter rateLimiter;
private final CcrSettings ccrSettings;
private final LongConsumer throttleListener;

RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter,
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings,
LongConsumer throttleListener) {
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
this.sourceMetaData = sourceMetaData;
this.rateLimiter = rateLimiter;
this.ccrSettings = ccrSettings;
this.throttleListener = throttleListener;
}

Expand All @@ -338,14 +343,14 @@ void restoreFiles() throws IOException {

@Override
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener);
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), ccrSettings, throttleListener);
}

@Override
public void close() {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
}
}

Expand All @@ -356,17 +361,19 @@ private static class RestoreFileInputStream extends InputStream {
private final DiscoveryNode node;
private final StoreFileMetaData fileToRecover;
private final CombinedRateLimiter rateLimiter;
private final CcrSettings ccrSettings;
private final LongConsumer throttleListener;

private long pos = 0;

private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
CombinedRateLimiter rateLimiter, LongConsumer throttleListener) {
CcrSettings ccrSettings, LongConsumer throttleListener) {
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
this.fileToRecover = fileToRecover;
this.rateLimiter = rateLimiter;
this.ccrSettings = ccrSettings;
this.rateLimiter = ccrSettings.getRateLimiter();
this.throttleListener = throttleListener;
}

Expand All @@ -391,7 +398,7 @@ public int read(byte[] bytes, int off, int len) throws IOException {
String fileName = fileToRecover.name();
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet();
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout());
BytesReference fileChunk = response.getChunk();

int bytesReceived = fileChunk.length();
Expand Down