Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local session timeouts to leader node #37438

Merged
merged 10 commits into from
Jan 18, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ public Collection<Object> createComponents(
return emptyList();
}

CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService();
this.restoreSourceService.set(restoreSourceService);
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
this.ccrSettings.set(ccrSettings);
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings);
this.restoreSourceService.set(restoreSourceService);
return Arrays.asList(
ccrLicenseChecker,
restoreSourceService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public final class CcrSettings {
Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* The leader must open resources for a ccr recovery. If there is no activity for this interval of time,
* the leader will close the restore session.
*/
public static final Setting<TimeValue> INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING =
Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -53,22 +61,33 @@ static List<Setting<?>> getSettings() {
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
RECOVERY_MAX_BYTES_PER_SECOND,
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
}

private final CombinedRateLimiter ccrRateLimiter;
private volatile TimeValue recoveryActivityTimeout;

public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
}

private void setRecoveryActivityTimeout(TimeValue recoveryActivityTimeout) {
this.recoveryActivityTimeout = recoveryActivityTimeout;
}

public CombinedRateLimiter getRateLimiter() {
return ccrRateLimiter;
}

public TimeValue getRecoveryActivityTimeout() {
return recoveryActivityTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.KeyedLock;
Expand All @@ -28,6 +29,9 @@
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -45,8 +49,14 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen

private final Map<String, RestoreSession> onGoingRestores = ConcurrentCollections.newConcurrentMap();
private final Map<IndexShard, HashSet<String>> sessionsForShard = new HashMap<>();
private final CopyOnWriteArrayList<Consumer<String>> openSessionListeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
private final ThreadPool threadPool;
private final CcrSettings ccrSettings;

public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) {
this.threadPool = threadPool;
this.ccrSettings = ccrSettings;
}

@Override
public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
Expand Down Expand Up @@ -81,26 +91,10 @@ protected synchronized void doClose() throws IOException {

// TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested,
// these should be removed.
public void addOpenSessionListener(Consumer<String> listener) {
openSessionListeners.add(listener);
}

public void addCloseSessionListener(Consumer<String> listener) {
closeSessionListeners.add(listener);
}

// default visibility for testing
synchronized HashSet<String> getSessionsForShard(IndexShard indexShard) {
return sessionsForShard.get(indexShard);
}

// default visibility for testing
synchronized RestoreSession getOngoingRestore(String sessionUUID) {
return onGoingRestores.get(sessionUUID);
}

// TODO: Add a local timeout for the session. This timeout might might be for the entire session to be
// complete. Or it could be for session to have been touched.
public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException {
boolean success = false;
RestoreSession restore = null;
Expand All @@ -113,9 +107,8 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index
if (indexShard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed");
}
restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit());
restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit(), scheduleTimeout(sessionUUID));
onGoingRestores.put(sessionUUID, restore);
openSessionListeners.forEach(c -> c.accept(sessionUUID));
HashSet<String> sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>());
sessions.add(sessionUUID);
}
Expand All @@ -133,49 +126,79 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index
}

public void closeSession(String sessionUUID) {
internalCloseSession(sessionUUID, true);
}

public synchronized SessionReader getSessionReader(String sessionUUID) {
RestoreSession restore = onGoingRestores.get(sessionUUID);
if (restore == null) {
logger.debug("could not get session [{}] because session not found", sessionUUID);
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
}
restore.idle = false;
return new SessionReader(restore);
}

private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) {
final RestoreSession restore;
synchronized (this) {
closeSessionListeners.forEach(c -> c.accept(sessionUUID));
restore = onGoingRestores.remove(sessionUUID);
if (restore == null) {
logger.debug("could not close session [{}] because session not found", sessionUUID);
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
if (throwIfSessionMissing) {
logger.debug("could not close session [{}] because session not found", sessionUUID);
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
} else {
return;
}
}
HashSet<String> sessions = sessionsForShard.get(restore.indexShard);
assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores";
if (sessions != null) {
boolean removed = sessions.remove(sessionUUID);
assert removed : "No session found for UUID [" + sessionUUID +"]";
assert removed : "No session found for UUID [" + sessionUUID + "]";
if (sessions.isEmpty()) {
sessionsForShard.remove(restore.indexShard);
}
}
}
closeSessionListeners.forEach(c -> c.accept(sessionUUID));
restore.decRef();

}

public synchronized SessionReader getSessionReader(String sessionUUID) {
RestoreSession restore = onGoingRestores.get(sessionUUID);
if (restore == null) {
logger.debug("could not get session [{}] because session not found", sessionUUID);
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
private Scheduler.Cancellable scheduleTimeout(String sessionUUID) {
TimeValue idleTimeout = ccrSettings.getRecoveryActivityTimeout();
return threadPool.scheduleWithFixedDelay(() -> maybeTimeout(sessionUUID), idleTimeout, ThreadPool.Names.GENERIC);
}

private void maybeTimeout(String sessionUUID) {
RestoreSession restoreSession = onGoingRestores.get(sessionUUID);
if (restoreSession != null) {
if (restoreSession.idle) {
internalCloseSession(sessionUUID, false);
} else {
restoreSession.idle = true;
}
}
return new SessionReader(restore);
}

private static class RestoreSession extends AbstractRefCounted {

private final String sessionUUID;
private final IndexShard indexShard;
private final Engine.IndexCommitRef commitRef;
private final Scheduler.Cancellable timeoutTask;
private final KeyedLock<String> keyedLock = new KeyedLock<>();
private final Map<String, IndexInput> cachedInputs = new ConcurrentHashMap<>();
private volatile boolean idle = false;

private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) {
private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef,
Scheduler.Cancellable timeoutTask) {
super("restore-session");
this.sessionUUID = sessionUUID;
this.indexShard = indexShard;
this.commitRef = commitRef;
this.timeoutTask = timeoutTask;
}

private Store.MetadataSnapshot getMetaData() throws IOException {
Expand Down Expand Up @@ -223,6 +246,7 @@ private long readFileBytes(String fileName, BytesReference reference) throws IOE
protected void closeInternal() {
logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId());
assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing";
timeoutTask.cancel();
IOUtils.closeWhileHandlingException(cachedInputs.values());
}
}
Expand Down
Loading