Skip to content

Commit

Permalink
Pass TranslogRecoveryRunner to engine from outside
Browse files Browse the repository at this point in the history
This commit allows us to use different TranslogRecoveryRunner when recovering an engine from its local translog. This change is a prerequisite for the commit-based rollback PR (elastic#32867).

See elastic#32867 (comment)
  • Loading branch information
dnhatn committed Sep 5, 2018
1 parent 5c624bc commit 5c19187
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 78 deletions.
10 changes: 8 additions & 2 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1642,9 +1642,10 @@ public interface Warmer {
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
*
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
* @param translogRecoveryRunner the translog recovery runner
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
Expand All @@ -1662,4 +1663,9 @@ public boolean isRecovering() {
* Tries to prune buffered deletes from the version map.
*/
public abstract void maybePruneDeletes();

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -76,7 +74,6 @@ public final class EngineConfig {
private final List<ReferenceManager.RefreshListener> internalRefreshListener;
@Nullable
private final Sort indexSort;
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
Expand Down Expand Up @@ -127,9 +124,8 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -163,7 +159,6 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
this.externalRefreshListener = externalRefreshListener;
this.internalRefreshListener = internalRefreshListener;
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
Expand Down Expand Up @@ -324,18 +319,6 @@ public TranslogConfig getTranslogConfig() {
*/
public TimeValue getFlushMergesAfter() { return flushMergesAfter; }

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns a runner that implements the translog recovery from the given snapshot
*/
public TranslogRecoveryRunner getTranslogRecoveryRunner() {
return translogRecoveryRunner;
}

/**
* The refresh listeners to add to Lucene for externally visible refreshes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
}

@Override
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslogInternal(recoverUpToSeqNo);
recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
Expand All @@ -423,13 +423,13 @@ public void skipTranslogRecovery() {
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(Long.MAX_VALUE);
getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -2233,7 +2233,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
}

/**
Expand Down
Loading

0 comments on commit 5c19187

Please sign in to comment.