diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java
index 5868698416c30..7a03adbd13f4e 100644
--- a/server/src/main/java/org/opensearch/index/engine/Engine.java
+++ b/server/src/main/java/org/opensearch/index/engine/Engine.java
@@ -80,10 +80,11 @@
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
+import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogDeletionPolicy;
-import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
+import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;
import java.io.Closeable;
@@ -109,6 +110,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.stream.Stream;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -118,7 +120,7 @@
*
* @opensearch.internal
*/
-public abstract class Engine implements LifecycleAware, Closeable {
+public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: remove sync_id in 3.0
public static final String HISTORY_UUID_KEY = "history_uuid";
@@ -349,6 +351,12 @@ boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and te
*/
public abstract boolean isThrottled();
+ /**
+ * Trims translog for terms below belowTerm
and seq# above aboveSeqNo
+ * @see Translog#trimOperations(long, long)
+ */
+ public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;
+
/**
* A Lock implementation that always allows the lock to be acquired
*
@@ -781,6 +789,18 @@ public enum SearcherScope {
INTERNAL
}
+ /**
+ * Checks if the underlying storage sync is required.
+ */
+ public abstract boolean isTranslogSyncNeeded();
+
+ /**
+ * Ensures that all locations in the given stream have been written to the underlying storage.
+ */
+ public abstract boolean ensureTranslogSynced(Stream locations) throws IOException;
+
+ public abstract void syncTranslog() throws IOException;
+
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
@@ -816,6 +836,13 @@ public abstract Translog.Snapshot newChangesSnapshot(
*/
public abstract long getMinRetainedSeqNo();
+ public abstract TranslogStats getTranslogStats();
+
+ /**
+ * Returns the last location that the translog of this engine has written into.
+ */
+ public abstract Translog.Location getTranslogLastWriteLocation();
+
protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
@@ -1146,6 +1173,25 @@ public final void flush() throws EngineException {
flush(false, false);
}
+ /**
+ * checks and removes translog files that no longer need to be retained. See
+ * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
+ */
+ public abstract void trimUnreferencedTranslogFiles() throws EngineException;
+
+ /**
+ * Tests whether or not the translog generation should be rolled to a new generation.
+ * This test is based on the size of the current generation compared to the configured generation threshold size.
+ *
+ * @return {@code true} if the current generation should be rolled to a new generation
+ */
+ public abstract boolean shouldRollTranslogGeneration();
+
+ /**
+ * Rolls the translog generation and cleans unneeded.
+ */
+ public abstract void rollTranslogGeneration() throws EngineException;
+
/**
* Triggers a forced merge on this engine
*/
@@ -1957,6 +2003,14 @@ public interface Warmer {
*/
public abstract void deactivateThrottling();
+ /**
+ * This method replays translog to restore the Lucene index which might be reverted previously.
+ * This ensures that all acknowledged writes are restored correctly when this engine is promoted.
+ *
+ * @return the number of translog operations have been recovered
+ */
+ public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;
+
/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
* and the max seen sequence ID are identical.
@@ -1965,6 +2019,20 @@ public interface Warmer {
*/
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;
+ /**
+ * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
+ * This operation will close the engine if the recovery fails.
+ *
+ * @param translogRecoveryRunner the translog recovery runner
+ * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
+ */
+ public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;
+
+ /**
+ * Do not replay translog operations, but make the engine be ready.
+ */
+ public abstract void skipTranslogRecovery();
+
/**
* Tries to prune buffered deletes from the version map.
*/
@@ -1985,6 +2053,16 @@ public long getMaxSeenAutoIdTimestamp() {
*/
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);
+ /**
+ * The runner for translog recovery
+ *
+ * @opensearch.internal
+ */
+ @FunctionalInterface
+ public interface TranslogRecoveryRunner {
+ int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
+ }
+
/**
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
index 69a168eb04ac7..5f0f14b04be6a 100644
--- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
@@ -33,6 +33,7 @@
package org.opensearch.index.engine;
import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
@@ -104,12 +105,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
-import org.opensearch.index.translog.Translog;
-import org.opensearch.index.translog.TranslogCorruptedException;
-import org.opensearch.index.translog.TranslogDeletionPolicy;
-import org.opensearch.index.translog.TranslogManager;
-import org.opensearch.index.translog.TranslogException;
-import org.opensearch.index.translog.InternalTranslogManager;
+import org.opensearch.index.translog.*;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;
@@ -175,6 +171,7 @@ public class InternalEngine extends Engine {
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
+ private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
@@ -481,6 +478,21 @@ final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
return true;
}
+ @Override
+ public boolean isTranslogSyncNeeded() {
+ return translogManager.isTranslogSyncNeeded();
+ }
+
+ @Override
+ public boolean ensureTranslogSynced(Stream locations) throws IOException {
+ return translogManager.ensureTranslogSynced(locations);
+ }
+
+ @Override
+ public void syncTranslog() throws IOException {
+ translogManager.syncTranslog();
+ }
+
@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
@@ -510,6 +522,60 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
}
}
+ @Override
+ public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
+ try (ReleasableLock lock = readLock.acquire()) {
+ ensureOpen();
+ if (pendingTranslogRecovery.get() == false) {
+ throw new IllegalStateException("Engine has already been recovered");
+ }
+ try {
+ recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
+ } catch (Exception e) {
+ try {
+ pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
+ failEngine("failed to recover from translog", e);
+ } catch (Exception inner) {
+ e.addSuppressed(inner);
+ }
+ throw e;
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public void skipTranslogRecovery() {
+ translogManager.skipTranslogRecovery();
+ }
+
+ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
+ final int opsRecovered;
+ final long localCheckpoint = getProcessedLocalCheckpoint();
+ if (localCheckpoint < recoverUpToSeqNo) {
+ try (Translog.Snapshot snapshot = translogManager.getTranslog().newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
+ opsRecovered = translogRecoveryRunner.run(this, snapshot);
+ } catch (Exception e) {
+ throw new EngineException(shardId, "failed to recover from translog", e);
+ }
+ } else {
+ opsRecovered = 0;
+ }
+ // flush if we recovered something or if we have references to older translogs
+ // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
+ assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
+ pendingTranslogRecovery.set(false); // we are good - now we can commit
+ logger.trace(
+ () -> new ParameterizedMessage(
+ "flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
+ opsRecovered,
+ translogManager.getTranslog().currentFileGeneration()
+ )
+ );
+ flush(false, true);
+ translogManager.getTranslog().trimUnreferencedReaders();
+ }
+
private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
for (Map.Entry entry : writer.getLiveCommitData()) {
if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) {
@@ -1879,6 +1945,21 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
}
}
+ @Override
+ public void trimUnreferencedTranslogFiles() throws EngineException {
+
+ }
+
+ @Override
+ public boolean shouldRollTranslogGeneration() {
+ return false;
+ }
+
+ @Override
+ public void rollTranslogGeneration() throws EngineException {
+
+ }
+
private void refreshLastCommittedSegmentInfos() {
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
@@ -2364,11 +2445,21 @@ public void deactivateThrottling() {
}
}
+ @Override
+ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
+ return 0;
+ }
+
@Override
public boolean isThrottled() {
return throttle.isThrottled();
}
+ @Override
+ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
+
+ }
+
boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
return throttle.throttleLockIsHeldByCurrentThread();
}
@@ -2707,6 +2798,16 @@ public final long getMinRetainedSeqNo() {
return softDeletesPolicy.getMinRetainedSeqNo();
}
+ @Override
+ public TranslogStats getTranslogStats() {
+ return null;
+ }
+
+ @Override
+ public Translog.Location getTranslogLastWriteLocation() {
+ return null;
+ }
+
public Closeable acquireHistoryRetentionLock() {
return softDeletesPolicy.acquireRetentionLock();
}
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
index af175be286b13..b6ebcb5f7e9fa 100644
--- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
@@ -27,6 +27,7 @@
import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogException;
+import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;
@@ -38,6 +39,7 @@
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
+import java.util.stream.Stream;
/**
* This is an {@link Engine} implementation intended for replica shards when Segment Replication
@@ -46,7 +48,7 @@
*
* @opensearch.internal
*/
-public class NRTReplicationEngine extends Engine {
+public class NRTReplicationEngine extends Engine implements LifecycleAware {
private volatile SegmentInfos lastCommittedSegmentInfos;
private final NRTReplicationReaderManager readerManager;
@@ -147,6 +149,11 @@ public boolean isThrottled() {
return false;
}
+ @Override
+ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
+ translogManager.trimOperationsFromTranslog(belowTerm, aboveSeqNo);
+ }
+
@Override
public IndexResult index(Index index) throws IOException {
ensureOpen();
@@ -193,6 +200,21 @@ protected ReferenceManager getReferenceManager(Search
return readerManager;
}
+ @Override
+ public boolean isTranslogSyncNeeded() {
+ return translogManager.isTranslogSyncNeeded();
+ }
+
+ @Override
+ public boolean ensureTranslogSynced(Stream locations) throws IOException {
+ return translogManager.ensureTranslogSynced(locations);
+ }
+
+ @Override
+ public void syncTranslog() throws IOException {
+ translogManager.syncTranslog();
+ }
+
@Override
public Closeable acquireHistoryRetentionLock() {
throw new UnsupportedOperationException("Not implemented");
@@ -224,6 +246,16 @@ public long getMinRetainedSeqNo() {
return localCheckpointTracker.getProcessedCheckpoint();
}
+ @Override
+ public TranslogStats getTranslogStats() {
+ return translogManager.getTranslogStats();
+ }
+
+ @Override
+ public Translog.Location getTranslogLastWriteLocation() {
+ return translogManager.getTranslogLastWriteLocation();
+ }
+
@Override
public long getPersistedLocalCheckpoint() {
return localCheckpointTracker.getPersistedCheckpoint();
@@ -273,6 +305,21 @@ public boolean shouldPeriodicallyFlush() {
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {}
+ @Override
+ public void trimUnreferencedTranslogFiles() throws EngineException {
+ translogManager.trimUnreferencedTranslogFiles();
+ }
+
+ @Override
+ public boolean shouldRollTranslogGeneration() {
+ return translogManager.shouldRollTranslogGeneration();
+ }
+
+ @Override
+ public void rollTranslogGeneration() throws EngineException {
+ translogManager.rollTranslogGeneration();
+ }
+
@Override
public void forceMerge(
boolean flush,
@@ -325,11 +372,26 @@ public void activateThrottling() {}
@Override
public void deactivateThrottling() {}
+ @Override
+ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
+ return 0;
+ }
+
@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
return 0;
}
+ @Override
+ public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
+ throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog.");
+ }
+
+ @Override
+ public void skipTranslogRecovery() {
+ translogManager.skipTranslogRecovery();
+ }
+
@Override
public void maybePruneDeletes() {}
diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
index cebe262fee5d1..5e8b6a60305da 100644
--- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
@@ -44,6 +44,7 @@
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
+import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
@@ -66,6 +67,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.stream.Stream;
/**
* A basic read-only engine that allows switching a shard to be true read-only temporarily or permanently.
@@ -334,6 +336,21 @@ public NoOpResult noOp(NoOp noOp) {
throw new UnsupportedOperationException("no-ops are not supported on a read-only engine");
}
+ @Override
+ public boolean isTranslogSyncNeeded() {
+ return translogManager.isTranslogSyncNeeded();
+ }
+
+ @Override
+ public boolean ensureTranslogSynced(Stream locations) throws IOException {
+ return translogManager.ensureTranslogSynced(locations);
+ }
+
+ @Override
+ public void syncTranslog() throws IOException {
+ translogManager.syncTranslog();
+ }
+
@Override
public Closeable acquireHistoryRetentionLock() {
return () -> {};
@@ -367,6 +384,16 @@ public long getMinRetainedSeqNo() {
throw new UnsupportedOperationException();
}
+ @Override
+ public TranslogStats getTranslogStats() {
+ return translogManager.getTranslogStats();
+ }
+
+ @Override
+ public Translog.Location getTranslogLastWriteLocation() {
+ return translogManager.getTranslogLastWriteLocation();
+ }
+
@Override
public long getPersistedLocalCheckpoint() {
return seqNoStats.getLocalCheckpoint();
@@ -453,11 +480,54 @@ public void activateThrottling() {}
@Override
public void deactivateThrottling() {}
+ @Override
+ public void trimUnreferencedTranslogFiles() {
+ translogManager.trimUnreferencedTranslogFiles();
+ }
+
+ @Override
+ public boolean shouldRollTranslogGeneration() {
+ return translogManager.shouldRollTranslogGeneration();
+ }
+
+ @Override
+ public void rollTranslogGeneration() {
+ translogManager.rollTranslogGeneration();
+ }
+
+ @Override
+ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) {
+ return 0;
+ }
+
@Override
public int fillSeqNoGaps(long primaryTerm) {
return 0;
}
+ @Override
+ public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryRunner, final long recoverUpToSeqNo) {
+ try (ReleasableLock lock = readLock.acquire()) {
+ ensureOpen();
+ try (Translog.Snapshot snapshot = newEmptySnapshot()) {
+ translogRecoveryRunner.run(this, snapshot);
+ } catch (final Exception e) {
+ throw new EngineException(shardId, "failed to recover from empty translog snapshot", e);
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public void skipTranslogRecovery() {
+ translogManager.skipTranslogRecovery();
+ }
+
+ @Override
+ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
+ translogManager.trimOperationsFromTranslog(belowTerm, aboveSeqNo);
+ }
+
@Override
public void maybePruneDeletes() {}