Skip to content

Commit

Permalink
Reverting Engine Changes
Browse files Browse the repository at this point in the history
Signed-off-by: Satyajit Ganguly <[email protected]>
  • Loading branch information
Satyajit Ganguly committed Jul 7, 2022
1 parent 289c2d3 commit 91779f7
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 9 deletions.
82 changes: 80 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand All @@ -109,6 +110,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand All @@ -118,7 +120,7 @@
*
* @opensearch.internal
*/
public abstract class Engine implements LifecycleAware, Closeable {
public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: remove sync_id in 3.0
public static final String HISTORY_UUID_KEY = "history_uuid";
Expand Down Expand Up @@ -349,6 +351,12 @@ boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and te
*/
public abstract boolean isThrottled();

/**
* Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>
* @see Translog#trimOperations(long, long)
*/
public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;

/**
* A Lock implementation that always allows the lock to be acquired
*
Expand Down Expand Up @@ -781,6 +789,18 @@ public enum SearcherScope {
INTERNAL
}

/**
* Checks if the underlying storage sync is required.
*/
public abstract boolean isTranslogSyncNeeded();

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
*/
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;

public abstract void syncTranslog() throws IOException;

/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
Expand Down Expand Up @@ -816,6 +836,13 @@ public abstract Translog.Snapshot newChangesSnapshot(
*/
public abstract long getMinRetainedSeqNo();

public abstract TranslogStats getTranslogStats();

/**
* Returns the last location that the translog of this engine has written into.
*/
public abstract Translog.Location getTranslogLastWriteLocation();

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down Expand Up @@ -1146,6 +1173,25 @@ public final void flush() throws EngineException {
flush(false, false);
}

/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimUnreferencedTranslogFiles() throws EngineException;

/**
* Tests whether or not the translog generation should be rolled to a new generation.
* This test is based on the size of the current generation compared to the configured generation threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public abstract boolean shouldRollTranslogGeneration();

/**
* Rolls the translog generation and cleans unneeded.
*/
public abstract void rollTranslogGeneration() throws EngineException;

/**
* Triggers a forced merge on this engine
*/
Expand Down Expand Up @@ -1957,6 +2003,14 @@ public interface Warmer {
*/
public abstract void deactivateThrottling();

/**
* This method replays translog to restore the Lucene index which might be reverted previously.
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
*
* @return the number of translog operations have been recovered
*/
public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;

/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
* and the max seen sequence ID are identical.
Expand All @@ -1965,6 +2019,20 @@ public interface Warmer {
*/
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;

/**
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
*
* @param translogRecoveryRunner the translog recovery runner
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Tries to prune buffered deletes from the version map.
*/
Expand All @@ -1985,6 +2053,16 @@ public long getMaxSeenAutoIdTimestamp() {
*/
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);

/**
* The runner for translog recovery
*
* @opensearch.internal
*/
@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
Expand Down
113 changes: 107 additions & 6 deletions server/src/main/java/org/opensearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.engine;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
Expand Down Expand Up @@ -104,12 +105,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.InternalTranslogManager;
import org.opensearch.index.translog.*;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;
Expand Down Expand Up @@ -175,6 +171,7 @@ public class InternalEngine extends Engine {
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
Expand Down Expand Up @@ -481,6 +478,21 @@ final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
return true;
}

@Override
public boolean isTranslogSyncNeeded() {
return translogManager.isTranslogSyncNeeded();
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
return translogManager.ensureTranslogSynced(locations);
}

@Override
public void syncTranslog() throws IOException {
translogManager.syncTranslog();
}

@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
Expand Down Expand Up @@ -510,6 +522,60 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
}
}

@Override
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
failEngine("failed to recover from translog", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
return this;
}

@Override
public void skipTranslogRecovery() {
translogManager.skipTranslogRecovery();
}

private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
final int opsRecovered;
final long localCheckpoint = getProcessedLocalCheckpoint();
if (localCheckpoint < recoverUpToSeqNo) {
try (Translog.Snapshot snapshot = translogManager.getTranslog().newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
} else {
opsRecovered = 0;
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
logger.trace(
() -> new ParameterizedMessage(
"flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered,
translogManager.getTranslog().currentFileGeneration()
)
);
flush(false, true);
translogManager.getTranslog().trimUnreferencedReaders();
}

private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) {
Expand Down Expand Up @@ -1879,6 +1945,21 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
}
}

@Override
public void trimUnreferencedTranslogFiles() throws EngineException {

}

@Override
public boolean shouldRollTranslogGeneration() {
return false;
}

@Override
public void rollTranslogGeneration() throws EngineException {

}

private void refreshLastCommittedSegmentInfos() {
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
Expand Down Expand Up @@ -2364,11 +2445,21 @@ public void deactivateThrottling() {
}
}

@Override
public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
return 0;
}

@Override
public boolean isThrottled() {
return throttle.isThrottled();
}

@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {

}

boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
return throttle.throttleLockIsHeldByCurrentThread();
}
Expand Down Expand Up @@ -2707,6 +2798,16 @@ public final long getMinRetainedSeqNo() {
return softDeletesPolicy.getMinRetainedSeqNo();
}

@Override
public TranslogStats getTranslogStats() {
return null;
}

@Override
public Translog.Location getTranslogLastWriteLocation() {
return null;
}

public Closeable acquireHistoryRetentionLock() {
return softDeletesPolicy.acquireRetentionLock();
}
Expand Down
Loading

0 comments on commit 91779f7

Please sign in to comment.