diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 4b5fbd4ef84a6..9079bd1814a25 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -80,8 +80,11 @@ import org.opensearch.index.shard.DocsStats; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.TranslogDeletionPolicy; +import org.opensearch.index.translog.TranslogStats; import org.opensearch.search.suggest.completion.CompletionStats; import java.io.Closeable; @@ -899,6 +902,22 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl return stats; } + protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) { + TranslogDeletionPolicy customTranslogDeletionPolicy = null; + if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) { + customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory() + .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier()); + } + return Objects.requireNonNullElseGet( + customTranslogDeletionPolicy, + () -> new DefaultTranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), + engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() + ) + ); + } + protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegmentFileSizes, SegmentsStats stats) { stats.add(1); if (includeSegmentFileSizes) { diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index c81022f1554f0..256470f6841be 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -104,12 +104,15 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.TranslogException; +import org.opensearch.index.translog.InternalTranslogManager; +import org.opensearch.index.translog.listener.TranslogEventListener; +import org.opensearch.index.translog.listener.CompositeTranslogEventListener; import org.opensearch.search.suggest.completion.CompletionStats; import org.opensearch.threadpool.ThreadPool; @@ -149,7 +152,7 @@ public class InternalEngine extends Engine { */ private volatile long lastDeleteVersionPruneTimeMSec; - private final Translog translog; + private final InternalTranslogManager translogManager; private final OpenSearchConcurrentMergeScheduler mergeScheduler; private final IndexWriter indexWriter; @@ -176,7 +179,6 @@ public class InternalEngine extends Engine { // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); - private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. @@ -221,36 +223,35 @@ public class InternalEngine extends Engine { private volatile String forceMergeUUID; public InternalEngine(EngineConfig engineConfig) { - this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new); + this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER); } - InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction localCheckpointTrackerSupplier) { + public InternalEngine(EngineConfig engineConfig, TranslogEventListener translogEventListener) { + this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, translogEventListener); + } + + public InternalTranslogManager translogManager() { + return translogManager; + } + + InternalEngine( + EngineConfig engineConfig, + int maxDocs, + BiFunction localCheckpointTrackerSupplier, + TranslogEventListener translogEventListener + ) { super(engineConfig); this.maxDocs = maxDocs; if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { updateAutoIdTimestamp(Long.MAX_VALUE, true); } - final TranslogDeletionPolicy translogDeletionPolicy; - TranslogDeletionPolicy customTranslogDeletionPolicy = null; - if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) { - customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory() - .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier()); - } - if (customTranslogDeletionPolicy != null) { - translogDeletionPolicy = customTranslogDeletionPolicy; - } else { - translogDeletionPolicy = new DefaultTranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), - engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() - ); - } + final TranslogDeletionPolicy translogDeletionPolicy = getTranslogDeletionPolicy(engineConfig); store.incRef(); IndexWriter writer = null; - Translog translog = null; ExternalReaderManager externalReaderManager = null; OpenSearchReaderManager internalReaderManager = null; EngineMergeScheduler scheduler = null; + InternalTranslogManager translogManagerRef = null; boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); @@ -258,21 +259,48 @@ public InternalEngine(EngineConfig engineConfig) { throttle = new IndexThrottle(); try { store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath()); - translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> { - final LocalCheckpointTracker tracker = getLocalCheckpointTracker(); - assert tracker != null || getTranslog().isOpen() == false; - if (tracker != null) { - tracker.markSeqNoAsPersisted(seqNo); + final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); + final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); + TranslogEventListener internalTranslogEventListener = new TranslogEventListener() { + @Override + public void onAfterTranslogSync() { + revisitIndexDeletionPolicyOnTranslogSynced(); } - }); - assert translog.getGeneration() != null; - this.translog = translog; + + @Override + public void onAfterTranslogRecovery() { + flush(false, true); + translogManager.trimUnreferencedTranslogFiles(); + } + + @Override + public void onFailure(String reason, Exception ex) { + if (ex instanceof AlreadyClosedException) { + failOnTragicEvent((AlreadyClosedException) ex); + } else { + failEngine(reason, ex); + } + } + }; + translogManagerRef = new InternalTranslogManager( + engineConfig.getTranslogConfig(), + engineConfig.getPrimaryTermSupplier(), + engineConfig.getGlobalCheckpointSupplier(), + translogDeletionPolicy, + shardId, + readLock, + () -> getLocalCheckpointTracker(), + translogUUID, + new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), + this::ensureOpen + ); + this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = new CombinedDeletionPolicy( logger, translogDeletionPolicy, softDeletesPolicy, - translog::getLastSyncedGlobalCheckpoint + translogManager.getTranslog()::getLastSyncedGlobalCheckpoint ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); @@ -297,9 +325,6 @@ public InternalEngine(EngineConfig engineConfig) { this.internalReaderManager = internalReaderManager; this.externalReaderManager = externalReaderManager; internalReaderManager.addListener(versionMap); - assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; - // don't allow commits until we are done with recovering - pendingTranslogRecovery.set(true); for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { this.externalReaderManager.addListener(listener); } @@ -308,7 +333,9 @@ public InternalEngine(EngineConfig engineConfig) { } this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); - maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); + maxSeqNoOfUpdatesOrDeletes = new AtomicLong( + SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog().getMaxSeqNo()) + ); if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) { restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); @@ -325,6 +352,10 @@ public InternalEngine(EngineConfig engineConfig) { success = true; } finally { if (success == false) { + Translog translog = null; + if (translogManagerRef != null) { + translog = translogManagerRef.getTranslog(); + } IOUtils.closeWhileHandlingException(writer, translog, internalReaderManager, externalReaderManager, scheduler); if (isClosed.get() == false) { // failure we need to dec the store reference @@ -358,7 +389,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1; } return new SoftDeletesPolicy( - translog::getLastSyncedGlobalCheckpoint, + translogManager.getTranslog()::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo, engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(), engineConfig.retentionLeasesSupplier() @@ -462,7 +493,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) { + try (Translog.Snapshot snapshot = translogManager().getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) { return translogRecoveryRunner.run(this, snapshot); } } @@ -486,7 +517,7 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { + "]"; } - syncTranslog(); // to persist noops associated with the advancement of the local checkpoint + translogManager.syncTranslog(); // to persist noops associated with the advancement of the local checkpoint assert localCheckpointTracker.getPersistedCheckpoint() == maxSeqNo : "persisted local checkpoint did not advance to max seq no; is [" + localCheckpointTracker.getPersistedCheckpoint() @@ -512,14 +543,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - if (pendingTranslogRecovery.get() == false) { + if (translogManager().getPendingTranslogRecovery().get() == false) { throw new IllegalStateException("Engine has already been recovered"); } try { recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo); } catch (Exception e) { try { - pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush + translogManager().getPendingTranslogRecovery().set(true); // just play safe and never allow commits on this see + // #ensureCanFlush failEngine("failed to recover from translog", e); } catch (Exception inner) { e.addSuppressed(inner); @@ -532,15 +564,14 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover @Override public void skipTranslogRecovery() { - assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; - pendingTranslogRecovery.set(false); // we are good - now we can commit + translogManager.skipTranslogRecovery(); } private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { final int opsRecovered; final long localCheckpoint = getProcessedLocalCheckpoint(); if (localCheckpoint < recoverUpToSeqNo) { - try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { + try (Translog.Snapshot snapshot = translogManager().getTranslog().newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { opsRecovered = translogRecoveryRunner.run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); @@ -550,17 +581,17 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery } // flush if we recovered something or if we have references to older translogs // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. - assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; - pendingTranslogRecovery.set(false); // we are good - now we can commit + assert translogManager().getPendingTranslogRecovery().get() : "translogRecovery is not pending but should be"; + translogManager().getPendingTranslogRecovery().set(false); // we are good - now we can commit logger.trace( () -> new ParameterizedMessage( "flushing post recovery from translog: ops recovered [{}], current translog generation [{}]", opsRecovered, - translog.currentFileGeneration() + translogManager().getTranslog().currentFileGeneration() ) ); flush(false, true); - translog.trimUnreferencedReaders(); + translogManager().getTranslog().trimUnreferencedReaders(); } private Translog openTranslog( @@ -584,12 +615,6 @@ private Translog openTranslog( ); } - // Package private for testing purposes only - Translog getTranslog() { - ensureOpen(); - return translog; - } - // Package private for testing purposes only boolean hasSnapshottedCommits() { return combinedDeletionPolicy.hasSnapshottedCommits(); @@ -597,12 +622,12 @@ boolean hasSnapshottedCommits() { @Override public boolean isTranslogSyncNeeded() { - return getTranslog().syncNeeded(); + return translogManager().getTranslog().syncNeeded(); } @Override public boolean ensureTranslogSynced(Stream locations) throws IOException { - final boolean synced = translog.ensureSynced(locations); + final boolean synced = translogManager().getTranslog().ensureSynced(locations); if (synced) { revisitIndexDeletionPolicyOnTranslogSynced(); } @@ -611,25 +636,29 @@ public boolean ensureTranslogSynced(Stream locations) throws @Override public void syncTranslog() throws IOException { - translog.sync(); + translogManager().getTranslog().sync(); revisitIndexDeletionPolicyOnTranslogSynced(); } @Override public TranslogStats getTranslogStats() { - return getTranslog().stats(); + return translogManager().getTranslog().stats(); } @Override public Translog.Location getTranslogLastWriteLocation() { - return getTranslog().getLastWriteLocation(); + return translogManager().getTranslog().getLastWriteLocation(); } - private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { - if (combinedDeletionPolicy.hasUnreferencedCommits()) { - indexWriter.deleteUnusedFiles(); + private void revisitIndexDeletionPolicyOnTranslogSynced() { + try { + if (combinedDeletionPolicy.hasUnreferencedCommits()) { + indexWriter.deleteUnusedFiles(); + } + translogManager.getTranslog().trimUnreferencedReaders(); + } catch (IOException ex) { + throw new TranslogException(shardId, "Failed to execute index deletion policy on translog synced", ex); } - translog.trimUnreferencedReaders(); } @Override @@ -718,7 +747,7 @@ public GetResult get(Get get, BiFunction // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 if (versionValue.getLocation() != null) { try { - Translog.Operation operation = translog.readOperation(versionValue.getLocation()); + Translog.Operation operation = translogManager.getTranslog().readOperation(versionValue.getLocation()); if (operation != null) { // in the case of a already pruned translog generation we might get null here - yet very unlikely final Translog.Index index = (Translog.Index) operation; @@ -1023,7 +1052,7 @@ public IndexResult index(Index index) throws IOException { if (index.origin().isFromTranslog() == false) { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { - location = translog.add(new Translog.Index(index, indexResult)); + location = translogManager.getTranslog().add(new Translog.Index(index, indexResult)); } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no final NoOp noOp = new NoOp( @@ -1463,7 +1492,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); @@ -1790,7 +1819,8 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { } noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + final Translog.Location location = translogManager.getTranslog() + .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); } } @@ -1891,11 +1921,10 @@ public boolean shouldPeriodicallyFlush() { final long localCheckpointOfLastCommit = Long.parseLong( lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ); - final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo( - localCheckpointOfLastCommit + 1 - ).translogFileGeneration; + final long translogGenerationOfLastCommit = translogManager.getTranslog() + .getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration; final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); - if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { + if (translogManager.getTranslog().sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; } /* @@ -1913,9 +1942,8 @@ public boolean shouldPeriodicallyFlush() { * * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. */ - final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo( - localCheckpointTracker.getProcessedCheckpoint() + 1 - ).translogFileGeneration; + final long translogGenerationOfNewCommit = translogManager.getTranslog() + .getMinGenerationForSeqNo(localCheckpointTracker.getProcessedCheckpoint() + 1).translogFileGeneration; return translogGenerationOfLastCommit < translogGenerationOfNewCommit || localCheckpointTracker.getProcessedCheckpoint() == localCheckpointTracker.getMaxSeqNo(); } @@ -1954,11 +1982,11 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { || getProcessedLocalCheckpoint() > Long.parseLong( lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) )) { - ensureCanFlush(); + translogManager.ensureCanFlush(); try { - translog.rollGeneration(); + translogManager.getTranslog().rollGeneration(); logger.trace("starting commit for flush; commitTranslog=true"); - commitIndexWriter(indexWriter, translog); + commitIndexWriter(indexWriter, translogManager.getTranslog()); logger.trace("finished commit for flush"); // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved @@ -1971,7 +1999,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL, true); - translog.trimUnreferencedReaders(); + translogManager.getTranslog().trimUnreferencedReaders(); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; @@ -2026,8 +2054,8 @@ private void refreshLastCommittedSegmentInfos() { public void rollTranslogGeneration() throws EngineException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - translog.rollGeneration(); - translog.trimUnreferencedReaders(); + translogManager().getTranslog().rollGeneration(); + translogManager().getTranslog().trimUnreferencedReaders(); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; @@ -2045,7 +2073,7 @@ public void rollTranslogGeneration() throws EngineException { public void trimUnreferencedTranslogFiles() throws EngineException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - translog.trimUnreferencedReaders(); + translogManager().getTranslog().trimUnreferencedReaders(); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; @@ -2061,14 +2089,14 @@ public void trimUnreferencedTranslogFiles() throws EngineException { @Override public boolean shouldRollTranslogGeneration() { - return getTranslog().shouldRollGeneration(); + return translogManager().getTranslog().shouldRollGeneration(); } @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - translog.trimOperations(belowTerm, aboveSeqNo); + translogManager().getTranslog().trimOperations(belowTerm, aboveSeqNo); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; @@ -2248,8 +2276,8 @@ private boolean failOnTragicEvent(AlreadyClosedException ex) { } failEngine("already closed by tragic event on the index writer", tragicException); engineFailed = true; - } else if (translog.isOpen() == false && translog.getTragicException() != null) { - failEngine("already closed by tragic event on the translog", translog.getTragicException()); + } else if (translogManager.getTranslog().isOpen() == false && translogManager.getTranslog().getTragicException() != null) { + failEngine("already closed by tragic event on the translog", translogManager.getTranslog().getTragicException()); engineFailed = true; } else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet? // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by @@ -2274,7 +2302,7 @@ protected boolean maybeFailEngine(String source, Exception e) { return failOnTragicEvent((AlreadyClosedException) e); } else if (e != null && ((indexWriter.isOpen() == false && indexWriter.getTragicException() == e) - || (translog.isOpen() == false && translog.getTragicException() == e))) { + || (translogManager.getTranslog().isOpen() == false && translogManager.getTranslog().getTragicException() == e))) { // this spot on - we are handling the tragic event exception here so we have to fail the engine // right away failEngine(source, e); @@ -2360,7 +2388,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { logger.warn("Failed to close ReaderManager", e); } try { - IOUtils.close(translog); + IOUtils.close(translogManager.getTranslog()); } catch (Exception e) { logger.warn("Failed to close translog", e); } @@ -2629,7 +2657,7 @@ protected void doRun() throws Exception { * @param translog the translog */ protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException { - ensureCanFlush(); + translogManager.ensureCanFlush(); try { final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); writer.setLiveCommitData(() -> { @@ -2684,16 +2712,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl } } - final void ensureCanFlush() { - // translog recovery happens after the engine is fully constructed. - // If we are in this stage we have to prevent flushes from this - // engine otherwise we might loose documents if the flush succeeds - // and the translog recovery fails when we "commit" the translog on flush. - if (pendingTranslogRecovery.get()) { - throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery"); - } - } - @Override public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { mergeScheduler.refreshConfig(); @@ -2705,7 +2723,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran // the setting will be re-interpreted if it's set to true updateAutoIdTimestamp(Long.MAX_VALUE, true); } - final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps); @@ -2721,7 +2739,7 @@ LocalCheckpointTracker getLocalCheckpointTracker() { @Override public long getLastSyncedGlobalCheckpoint() { - return getTranslog().getLastSyncedGlobalCheckpoint(); + return translogManager.getTranslog().getLastSyncedGlobalCheckpoint(); } public long getProcessedLocalCheckpoint() { @@ -2860,7 +2878,7 @@ public Translog.Snapshot newChangesSnapshot( @Override public Translog.Snapshot newChangesSnapshotFromTranslogFile(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { - return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange); + return translogManager().getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange); } public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 14608ddc14c34..3b9da1bff5c79 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -23,11 +23,14 @@ import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicy; +import org.opensearch.index.translog.TranslogException; +import org.opensearch.index.translog.TranslogManager; import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.WriteOnlyTranslogManager; +import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.search.suggest.completion.CompletionStats; import java.io.Closeable; @@ -49,18 +52,19 @@ * * @opensearch.internal */ -public class NRTReplicationEngine extends Engine { +public class NRTReplicationEngine extends Engine implements LifecycleAware { private volatile SegmentInfos lastCommittedSegmentInfos; private final NRTReplicationReaderManager readerManager; private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; - private final Translog translog; + private final TranslogManager translogManager; public NRTReplicationEngine(EngineConfig engineConfig) { super(engineConfig); store.incRef(); NRTReplicationReaderManager readerManager = null; + TranslogManager translogManagerRef = null; try { lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId)); @@ -71,18 +75,49 @@ public NRTReplicationEngine(EngineConfig engineConfig) { this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); this.readerManager = readerManager; this.readerManager.addListener(completionStatsCache); - this.translog = openTranslog( - engineConfig, - getTranslogDeletionPolicy(engineConfig), + final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); + final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); + translogManagerRef = new WriteOnlyTranslogManager( + engineConfig.getTranslogConfig(), + engineConfig.getPrimaryTermSupplier(), engineConfig.getGlobalCheckpointSupplier(), - localCheckpointTracker::markSeqNoAsPersisted + getTranslogDeletionPolicy(engineConfig), + shardId, + readLock, + this::getLocalCheckpointTracker, + translogUUID, + new TranslogEventListener() { + @Override + public void onFailure(String reason, Exception ex) { + failEngine(reason, ex); + } + + @Override + public void onAfterTranslogSync() { + try { + translogManager.getTranslog().trimUnreferencedReaders(); + } catch (IOException ex) { + throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex); + } + } + }, + this ); + this.translogManager = translogManagerRef; } catch (IOException e) { - IOUtils.closeWhileHandlingException(store::decRef, readerManager); + Translog translog = null; + if (translogManagerRef != null) { + translog = translogManagerRef.getTranslog(); + } + IOUtils.closeWhileHandlingException(store::decRef, readerManager, translog); throw new EngineCreationFailureException(shardId, "failed to create engine", e); } } + public TranslogManager translogManager() { + return translogManager; + } + public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. readerManager.updateSegments(infos); @@ -91,7 +126,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th // generation. We can still refresh with incoming SegmentInfos that are not part of a commit point. if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) { this.lastCommittedSegmentInfos = infos; - rollTranslogGeneration(); + translogManager.rollTranslogGeneration(); } localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } @@ -125,7 +160,7 @@ public boolean isThrottled() { public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - translog.trimOperations(belowTerm, aboveSeqNo); + translogManager.getTranslog().trimOperations(belowTerm, aboveSeqNo); } catch (Exception e) { try { failEngine("translog operations trimming failed", e); @@ -140,7 +175,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E public IndexResult index(Index index) throws IOException { ensureOpen(); IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false); - final Translog.Location location = translog.add(new Translog.Index(index, indexResult)); + final Translog.Location location = translogManager.getTranslog().add(new Translog.Index(index, indexResult)); indexResult.setTranslogLocation(location); indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); @@ -152,7 +187,7 @@ public IndexResult index(Index index) throws IOException { public DeleteResult delete(Delete delete) throws IOException { ensureOpen(); DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true); - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); @@ -164,7 +199,8 @@ public DeleteResult delete(Delete delete) throws IOException { public NoOpResult noOp(NoOp noOp) throws IOException { ensureOpen(); NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); - final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + final Translog.Location location = translogManager.getTranslog() + .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.freeze(); @@ -184,22 +220,22 @@ protected ReferenceManager getReferenceManager(Search @Override public boolean isTranslogSyncNeeded() { - return translog.syncNeeded(); + return translogManager.getTranslog().syncNeeded(); } @Override public boolean ensureTranslogSynced(Stream locations) throws IOException { - boolean synced = translog.ensureSynced(locations); + boolean synced = translogManager.getTranslog().ensureSynced(locations); if (synced) { - translog.trimUnreferencedReaders(); + translogManager.getTranslog().trimUnreferencedReaders(); } return synced; } @Override public void syncTranslog() throws IOException { - translog.sync(); - translog.trimUnreferencedReaders(); + translogManager.getTranslog().sync(); + translogManager.getTranslog().trimUnreferencedReaders(); } @Override @@ -242,12 +278,12 @@ public long getMinRetainedSeqNo() { @Override public TranslogStats getTranslogStats() { - return translog.stats(); + return translogManager.getTranslog().stats(); } @Override public Translog.Location getTranslogLastWriteLocation() { - return translog.getLastWriteLocation(); + return translogManager.getTranslog().getLastWriteLocation(); } @Override @@ -266,7 +302,7 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) { @Override public long getLastSyncedGlobalCheckpoint() { - return translog.getLastSyncedGlobalCheckpoint(); + return translogManager.getTranslog().getLastSyncedGlobalCheckpoint(); } @Override @@ -302,7 +338,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { public void trimUnreferencedTranslogFiles() throws EngineException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - translog.trimUnreferencedReaders(); + translogManager.getTranslog().trimUnreferencedReaders(); } catch (Exception e) { try { failEngine("translog trimming failed", e); @@ -315,15 +351,15 @@ public void trimUnreferencedTranslogFiles() throws EngineException { @Override public boolean shouldRollTranslogGeneration() { - return translog.shouldRollGeneration(); + return translogManager.getTranslog().shouldRollGeneration(); } @Override public void rollTranslogGeneration() throws EngineException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - translog.rollGeneration(); - translog.trimUnreferencedReaders(); + translogManager.getTranslog().rollGeneration(); + translogManager.getTranslog().trimUnreferencedReaders(); } catch (Exception e) { try { failEngine("translog trimming failed", e); @@ -370,7 +406,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { - IOUtils.close(readerManager, translog, store::decRef); + IOUtils.close(readerManager, translogManager().getTranslog(), store::decRef); } catch (Exception e) { logger.warn("failed to close engine", e); } finally { @@ -421,12 +457,12 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {} public Translog getTranslog() { - return translog; + return translogManager.getTranslog(); } @Override public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { - final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); } @@ -469,21 +505,4 @@ private Translog openTranslog( persistedSequenceNumberConsumer ); } - - private TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) { - TranslogDeletionPolicy customTranslogDeletionPolicy = null; - if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) { - customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory() - .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier()); - } - return Objects.requireNonNullElseGet( - customTranslogDeletionPolicy, - () -> new DefaultTranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), - engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() - ) - ); - } - } diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index 3a16d884fd899..af2cc10d6a5b1 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -45,7 +45,10 @@ import org.opensearch.index.shard.DocsStats; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogManager; import org.opensearch.index.translog.TranslogConfig; +import org.opensearch.index.translog.TranslogException; +import org.opensearch.index.translog.NoOpTranslogManager; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicy; @@ -149,6 +152,85 @@ public DocsStats docStats() { return docsStats; } + /** + * This implementation will trim existing translog files using a {@link TranslogDeletionPolicy} + * that retains nothing but the last translog generation from safe commit. + */ + public TranslogManager translogManager() { + try { + return new NoOpTranslogManager(shardId, readLock, this::ensureOpen, this.translogStats, new Translog.Snapshot() { + @Override + public void close() {} + + @Override + public int totalOperations() { + return 0; + } + + @Override + public Translog.Operation next() { + return null; + } + + }) { + /** + * This implementation will trim existing translog files using a {@link TranslogDeletionPolicy} + * that retains nothing but the last translog generation from safe commit. + */ + @Override + public void trimUnreferencedTranslogFiles() throws TranslogException { + final Store store = engineConfig.getStore(); + store.incRef(); + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + final List commits = DirectoryReader.listCommits(store.directory()); + if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) { + final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); + final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); + if (translogUuid == null) { + throw new IllegalStateException("commit doesn't contain translog unique id"); + } + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + try ( + Translog translog = new Translog( + translogConfig, + translogUuid, + translogDeletionPolicy, + engineConfig.getGlobalCheckpointSupplier(), + engineConfig.getPrimaryTermSupplier(), + seqNo -> {} + ) + ) { + translog.trimUnreferencedReaders(); + // refresh the translog stats + translogStats = translog.stats(); + assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed " + + " current gen " + + translog.currentFileGeneration() + + " != min gen " + + translog.getMinFileGeneration(); + } + } + } catch (final Exception e) { + try { + failEngine("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new EngineException(shardId, "failed to trim translog", e); + } finally { + store.decRef(); + } + } + }; + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + /** * This implementation will trim existing translog files using a {@link TranslogDeletionPolicy} * that retains nothing but the last translog generation from safe commit. diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index f0cd4e704b027..ceea9eaac994c 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -51,6 +51,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogManager; +import org.opensearch.index.translog.NoOpTranslogManager; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogStats; @@ -90,6 +92,7 @@ public class ReadOnlyEngine extends Engine { private final SafeCommitInfo safeCommitInfo; private final CompletionStatsCache completionStatsCache; private final boolean requireCompleteHistory; + private final TranslogManager translogManager; protected volatile TranslogStats translogStats; @@ -143,6 +146,21 @@ public ReadOnlyEngine( completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); + translogManager = new NoOpTranslogManager(shardId, readLock, this::ensureOpen, this.translogStats, new Translog.Snapshot() { + @Override + public void close() {} + + @Override + public int totalOperations() { + return 0; + } + + @Override + public Translog.Operation next() { + return null; + } + }); + success = true; } finally { if (success == false) { @@ -265,6 +283,10 @@ protected ReferenceManager getReferenceManager(Search return readerManager; } + public TranslogManager translogManager() { + return translogManager; + } + @Override protected SegmentInfos getLastCommittedSegmentInfos() { return lastCommittedSegmentInfos; @@ -315,16 +337,18 @@ public NoOpResult noOp(NoOp noOp) { @Override public boolean isTranslogSyncNeeded() { - return false; + return translogManager.isTranslogSyncNeeded(); } @Override - public boolean ensureTranslogSynced(Stream locations) { - return false; + public boolean ensureTranslogSynced(Stream locations) throws IOException { + return translogManager.ensureTranslogSynced(locations); } @Override - public void syncTranslog() {} + public void syncTranslog() throws IOException { + translogManager.syncTranslog(); + } @Override public Closeable acquireHistoryRetentionLock() { @@ -380,7 +404,7 @@ public TranslogStats getTranslogStats() { @Override public Translog.Location getTranslogLastWriteLocation() { - return new Translog.Location(0, 0, 0); + return translogManager.getTranslogLastWriteLocation(); } @Override @@ -388,6 +412,12 @@ public long getPersistedLocalCheckpoint() { return seqNoStats.getLocalCheckpoint(); } + public long getProcessedLocalCheckpoint() { + // the read-only engine does not process checkpoints, so its + // processed checkpoint is identical to its persisted one. + return getPersistedLocalCheckpoint(); + } + @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); @@ -463,15 +493,19 @@ public void activateThrottling() {} public void deactivateThrottling() {} @Override - public void trimUnreferencedTranslogFiles() {} + public void trimUnreferencedTranslogFiles() { + translogManager.trimUnreferencedTranslogFiles(); + } @Override public boolean shouldRollTranslogGeneration() { - return false; + return translogManager.shouldRollTranslogGeneration(); } @Override - public void rollTranslogGeneration() {} + public void rollTranslogGeneration() { + translogManager.rollTranslogGeneration(); + } @Override public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) { @@ -497,10 +531,14 @@ public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryR } @Override - public void skipTranslogRecovery() {} + public void skipTranslogRecovery() { + translogManager.skipTranslogRecovery(); + } @Override - public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {} + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) { + translogManager.trimOperationsFromTranslog(belowTerm, aboveSeqNo); + } @Override public void maybePruneDeletes() {} diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 22f72cc3d9acd..6f5d57ad388bf 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -41,6 +41,10 @@ public class InternalTranslogManager implements TranslogManager { private final TranslogEventListener translogEventListener; private static final Logger logger = LogManager.getLogger(InternalTranslogManager.class); + public AtomicBoolean getPendingTranslogRecovery() { + return pendingTranslogRecovery; + } + public InternalTranslogManager( TranslogConfig translogConfig, LongSupplier primaryTermSupplier, @@ -81,11 +85,11 @@ public void rollTranslogGeneration() throws TranslogException { translog.rollGeneration(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { - translogEventListener.onTragicFailure(e); + translogEventListener.onFailure("translog roll generation failed", e); throw e; } catch (Exception e) { try { - translogEventListener.onFailure("translog trimming failed", e); + translogEventListener.onFailure("translog roll generation failed", e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -204,15 +208,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { engineLifeCycleAware.ensureOpen(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { - translogEventListener.onTragicFailure(e); + translogEventListener.onFailure("translog trimming unreferenced translog failed", e); throw e; } catch (Exception e) { try { - translogEventListener.onFailure("translog trimming failed", e); + translogEventListener.onFailure("translog trimming unreferenced translog failed", e); } catch (Exception inner) { e.addSuppressed(inner); } - throw new TranslogException(shardId, "failed to trim translog", e); + throw new TranslogException(shardId, "failed to trim unreferenced translog translog", e); } } @@ -237,7 +241,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T engineLifeCycleAware.ensureOpen(); translog.trimOperations(belowTerm, aboveSeqNo); } catch (AlreadyClosedException e) { - translogEventListener.onTragicFailure(e); + translogEventListener.onFailure("translog operations trimming failed", e); throw e; } catch (Exception e) { try { @@ -309,11 +313,14 @@ private Translog openTranslog( /** * Returns the the translog instance - * @param ensureOpen check if the engine is open * @return the {@link Translog} instance */ @Override - public Translog getTranslog(boolean ensureOpen) { + public Translog getTranslog() { + return translog; + } + + private Translog getTranslog(boolean ensureOpen) { if (ensureOpen) { this.engineLifeCycleAware.ensureOpen(); } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 07cae808ce071..88e6ce97b2784 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -93,7 +93,7 @@ public boolean shouldRollTranslogGeneration() { public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {} @Override - public Translog getTranslog(boolean ensureOpen) { + public Translog getTranslog() { return null; } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 988a88c5d2ae5..dc2c2e20015b0 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -96,10 +96,9 @@ public interface TranslogManager { /** * Returns the instance of the translog with a precondition - * @param ensureOpen check if the engine is open * @return the translog instance */ - Translog getTranslog(boolean ensureOpen); + Translog getTranslog(); /** * Checks if the translog has a pending recovery diff --git a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java index 731b069ab0c74..b738fa0feea59 100644 --- a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java +++ b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java @@ -11,8 +11,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.TranslogException; import java.util.ArrayList; import java.util.Collection; @@ -27,9 +28,11 @@ public final class CompositeTranslogEventListener implements TranslogEventListener { private final List listeners; + private final ShardId shardId; private final Logger logger = LogManager.getLogger(CompositeTranslogEventListener.class); - public CompositeTranslogEventListener(Collection listeners) { + public CompositeTranslogEventListener(Collection listeners, ShardId shardId) { + this.shardId = shardId; for (TranslogEventListener listener : listeners) { if (listener == null) { throw new IllegalArgumentException("listeners must be non-null"); @@ -49,7 +52,7 @@ public void onAfterTranslogSync() { exceptionList.add(ex); } } - ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + maybeThrowTranslogExceptionAndSuppress(exceptionList); } @Override @@ -63,7 +66,7 @@ public void onAfterTranslogRecovery() { exceptionList.add(ex); } } - ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + maybeThrowTranslogExceptionAndSuppress(exceptionList); } @Override @@ -77,7 +80,7 @@ public void onBeginTranslogRecovery() { exceptionList.add(ex); } } - ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + maybeThrowTranslogExceptionAndSuppress(exceptionList); } @Override @@ -91,20 +94,16 @@ public void onFailure(String reason, Exception e) { exceptionList.add(ex); } } - ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + maybeThrowTranslogExceptionAndSuppress(exceptionList); } - @Override - public void onTragicFailure(AlreadyClosedException e) { - List exceptionList = new ArrayList<>(listeners.size()); - for (TranslogEventListener listener : listeners) { - try { - listener.onTragicFailure(e); - } catch (Exception ex) { - logger.warn(() -> new ParameterizedMessage("failed to invoke onTragicFailure listener"), ex); - exceptionList.add(ex); - } + private void maybeThrowTranslogExceptionAndSuppress(List exceptions) { + T main = null; + for (T ex : exceptions) { + main = ExceptionsHelper.useOrSuppress(main, ex); + } + if (main != null) { + throw new TranslogException(shardId, "Error while executing translog event listener", main); } - ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); } } diff --git a/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java index 1862b4b9a62b7..664cdd6c60985 100644 --- a/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java +++ b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java @@ -8,8 +8,6 @@ package org.opensearch.index.translog.listener; -import org.apache.lucene.store.AlreadyClosedException; - /** * The listener that gets fired on events related to {@link org.opensearch.index.translog.TranslogManager} * @@ -35,12 +33,6 @@ default void onAfterTranslogRecovery() {} */ default void onBeginTranslogRecovery() {} - /** - * Invoked when translog operations run into accessing an already closed resource - * @param ex the exception thrown when accessing a closed resource - */ - default void onTragicFailure(AlreadyClosedException ex) {} - /** * Invoked when translog operations run into any other failure * @param reason the failure reason diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index b04331fd40a21..345adfe7a8891 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -145,7 +145,9 @@ import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; +import org.opensearch.index.translog.TranslogException; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; +import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.VersionUtils; @@ -794,20 +796,20 @@ public long getProcessedCheckpoint() { } public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { - engine.ensureCanFlush(); // recovered already + engine.translogManager().ensureCanFlush(); // recovered already ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); engine.close(); engine = new InternalEngine(engine.config()); - expectThrows(IllegalStateException.class, engine::ensureCanFlush); + expectThrows(IllegalStateException.class, engine.translogManager()::ensureCanFlush); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); if (randomBoolean()) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); } else { - engine.skipTranslogRecovery(); + engine.translogManager().skipTranslogRecovery(); } - engine.ensureCanFlush(); // ready + engine.translogManager().ensureCanFlush(); // ready doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); engine.flush(); @@ -858,8 +860,10 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { } finally { IOUtils.close(engine); } - try (Engine recoveringEngine = new InternalEngine(engine.config())) { - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + try (InternalEngine recoveringEngine = new InternalEngine(engine.config())) { + TranslogHandler translogHandler = createTranslogHandler(engine.config().getIndexSettings(), recoveringEngine); + recoveringEngine.translogManager() + .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); recoveringEngine.refresh("test"); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -883,7 +887,7 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException { IOUtils.close(initialEngine); } - Engine recoveringEngine = null; + InternalEngine recoveringEngine = null; try { final AtomicBoolean committed = new AtomicBoolean(); recoveringEngine = new InternalEngine(initialEngine.config()) { @@ -894,8 +898,10 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I super.commitIndexWriter(writer, translog); } }; - assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertThat(recoveringEngine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(docs)); + TranslogHandler translogHandler = createTranslogHandler(initialEngine.config().getIndexSettings(), recoveringEngine); + recoveringEngine.translogManager() + .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertTrue(committed.get()); } finally { IOUtils.close(recoveringEngine); @@ -907,7 +913,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { final List seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toList()); Randomness.shuffle(seqNos); Engine initialEngine = null; - Engine recoveringEngine = null; + InternalEngine recoveringEngine = null; Store store = createStore(); final AtomicInteger counter = new AtomicInteger(); try { @@ -929,7 +935,9 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { } initialEngine.close(); recoveringEngine = new InternalEngine(initialEngine.config()); - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + TranslogHandler translogHandler = createTranslogHandler(initialEngine.config().getIndexSettings(), recoveringEngine); + recoveringEngine.translogManager() + .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); recoveringEngine.refresh("test"); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), docs); @@ -952,23 +960,25 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); if (rarely()) { - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } else if (rarely()) { engine.flush(randomBoolean(), true); } } maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getProcessedLocalCheckpoint())); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); } try (InternalEngine engine = new InternalEngine(config)) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertThat(engine.getProcessedLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); } try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); - engine.recoverFromTranslog(translogHandler, upToSeqNo); + TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), upToSeqNo); assertThat(engine.getProcessedLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); } @@ -1283,26 +1293,30 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { engine.index(indexForDoc(doc)); boolean inSync = randomBoolean(); if (inSync) { - engine.syncTranslog(); // to advance persisted local checkpoint + engine.translogManager().syncTranslog(); // to advance persisted local checkpoint globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); } engine.flush(); - assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(); - assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(true, true); - assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(true, true); - assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(3L)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(3L)); } public void testSyncTranslogConcurrently() throws Exception { @@ -1315,7 +1329,7 @@ public void testSyncTranslogConcurrently() throws Exception { applyOperations(engine, ops); engine.flush(true, true); final CheckedRunnable checker = () -> { - assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0)); assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get())); try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( @@ -1331,7 +1345,7 @@ public void testSyncTranslogConcurrently() throws Exception { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); try { - engine.syncTranslog(); + engine.translogManager().syncTranslog(); checker.run(); } catch (IOException e) { throw new AssertionError(e); @@ -1386,7 +1400,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { store.associateIndexWithNewTranslog(translogUUID); } engine = new InternalEngine(config); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1420,7 +1434,8 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { EngineConfig config = engine.config(); engine.close(); engine = new InternalEngine(config); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertNull( "Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID) @@ -1732,7 +1747,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { long localCheckpoint = engine.getProcessedLocalCheckpoint(); globalCheckpoint.set(randomLongBetween(0, localCheckpoint)); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); final long safeCommitCheckpoint; try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { safeCommitCheckpoint = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); @@ -1763,7 +1778,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { indexSettings.getSoftDeleteRetentionOperations() ); globalCheckpoint.set(localCheckpoint); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); @@ -1821,7 +1836,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc } engine.flush(); globalCheckpoint.set(randomLongBetween(0, engine.getPersistedLocalCheckpoint())); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); final long minSeqNoToRetain; try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { long safeCommitLocalCheckpoint = Long.parseLong( @@ -1869,12 +1884,12 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc if (useRecoverySource == false) { liveDocsWithSource.add(doc.id()); } - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(randomBoolean(), true); } else { globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); } engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); @@ -2232,7 +2247,7 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm; if (rarely()) { currentTerm.set(currentTerm.get() + 1L); - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } final long correctVersion = docDeleted ? Versions.MATCH_DELETED : lastOpVersion; logger.info( @@ -2778,7 +2793,7 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { } } - initialEngine.syncTranslog(); // to advance persisted local checkpoint + initialEngine.translogManager().syncTranslog(); // to advance persisted local checkpoint if (randomInt(10) < 3) { // only update rarely as we do it every doc @@ -2805,8 +2820,11 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint) ); - initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint - assertThat(initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); + initialEngine.ensureOpen(); + initialEngine.translogManager().getTranslog().sync(); // to guarantee the global checkpoint is written to the translog + // checkpoint + initialEngine.ensureOpen(); + assertThat(initialEngine.translogManager().getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat(Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo)); } finally { @@ -2814,14 +2832,17 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { } try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + TranslogHandler translogHandler = createTranslogHandler(initialEngine.config().getIndexSettings(), recoveringEngine); + recoveringEngine.translogManager() + .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo) ); - assertThat(recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); + recoveringEngine.ensureOpen(); + assertThat(recoveringEngine.translogManager().getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), // after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert @@ -3225,24 +3246,32 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { try (InternalEngine engine = createEngine(config)) { engine.index(firstIndexRequest); - engine.syncTranslog(); // to advance persisted local checkpoint + engine.translogManager().syncTranslog(); // to advance persisted local checkpoint assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint()); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE)); + expectThrows( + IllegalStateException.class, + () -> engine.translogManager() + .recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE) + ); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } // open and recover tlog { for (int i = 0; i < 2; i++) { try (InternalEngine engine = new InternalEngine(config)) { - expectThrows(IllegalStateException.class, engine::ensureCanFlush); + expectThrows(IllegalStateException.class, engine.translogManager()::ensureCanFlush); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } } @@ -3257,10 +3286,12 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); - assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); + engine.ensureOpen(); + assertEquals(2, engine.translogManager().getTranslog().currentFileGeneration()); + assertEquals(0L, engine.translogManager().getTranslog().stats().getUncommittedOperations()); } } @@ -3269,10 +3300,12 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { for (int i = 0; i < 2; i++) { try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } } @@ -3340,7 +3373,7 @@ public void testTranslogReplayWithFailure() throws IOException { try { engine = createEngine(store, translogPath); started = true; - } catch (EngineException | IOException e) { + } catch (EngineException | TranslogException | IOException e) { logger.trace("exception on open", e); } directory.setRandomIOExceptionRateOnOpen(0.0); @@ -3393,10 +3426,10 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I } } ) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); - engine.syncTranslog(); // to advance local checkpoint + engine.translogManager().syncTranslog(); // to advance local checkpoint assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint()); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); throwErrorOnCommit.set(true); @@ -3408,12 +3441,15 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier) ) ) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertVisibleCount(engine, 1); final long localCheckpoint = Long.parseLong( engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ); - final long committedGen = engine.getTranslog().getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; + engine.ensureOpen(); + final long committedGen = engine.translogManager() + .getTranslog() + .getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; for (int gen = 1; gen < committedGen; gen++) { final Path genFile = translogPath.resolve(Translog.getFilename(gen)); assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile)); @@ -3447,7 +3483,7 @@ public void testSkipTranslogReplay() throws IOException { assertVisibleCount(engine, numDocs); engine.close(); try (InternalEngine engine = new InternalEngine(config)) { - engine.skipTranslogRecovery(); + engine.translogManager().skipTranslogRecovery(); try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); assertThat(topDocs.totalHits.value, equalTo(0L)); @@ -3489,19 +3525,20 @@ public void testTranslogReplay() throws IOException { assertThat(indexResult.getVersion(), equalTo(1L)); } assertVisibleCount(engine, numDocs); - translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); + translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine); engine.close(); // we need to reuse the engine config unless the parser.mappingModified won't work engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); engine.refresh("warm_up"); assertVisibleCount(engine, numDocs, false); assertEquals(numDocs, translogHandler.appliedOperations()); engine.close(); - translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); + translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine); engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier); engine.refresh("warm_up"); assertVisibleCount(engine, numDocs, false); @@ -3555,7 +3592,7 @@ public void testTranslogReplay() throws IOException { } engine.close(); - translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); + translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine); engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier); engine.refresh("warm_up"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { @@ -3597,7 +3634,8 @@ public void testRecoverFromForeignTranslog() throws IOException { assertThat(index.getVersion(), equalTo(1L)); } assertVisibleCount(engine, numDocs); - Translog.TranslogGeneration generation = engine.getTranslog().getGeneration(); + engine.ensureOpen(); + Translog.TranslogGeneration generation = engine.translogManager().getTranslog().getGeneration(); engine.close(); final Path badTranslogLog = createTempDir(); @@ -3693,7 +3731,8 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier commits = DirectoryReader.listCommits(store.directory()); assertThat( Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO)), @@ -5893,9 +5950,10 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { } // Global checkpoint advanced enough - only the last commit is kept. globalCheckpoint.set(randomLongBetween(engine.getPersistedLocalCheckpoint(), Long.MAX_VALUE)); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1))); - assertThat(engine.getTranslog().totalOperations(), equalTo(0)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().totalOperations(), equalTo(0)); } } @@ -5918,7 +5976,7 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit. } globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); final List commits = DirectoryReader.listCommits(store.directory()); for (int i = 0; i < numSnapshots - 1; i++) { snapshots.get(i).close(); @@ -5933,12 +5991,13 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. - final Translog translog = engine.getTranslog(); + engine.ensureOpen(); + final Translog translog = engine.translogManager().getTranslog(); final IntSupplier uncommittedTranslogOperationsSinceLastCommit = () -> { long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); }; - final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() + final long extraTranslogSizeInNewEngine = engine.translogManager().getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { @@ -5946,10 +6005,11 @@ public void testShouldPeriodicallyFlush() throws Exception { engine.index(indexForDoc(doc)); } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); + engine.ensureOpen(); long flushThreshold = RandomNumbers.randomLongBetween( random(), 120, - engine.getTranslog().stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine + engine.translogManager().getTranslog().stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine ); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata()) @@ -5965,7 +6025,8 @@ public void testShouldPeriodicallyFlush() throws Exception { indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); @@ -6021,11 +6082,13 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception { indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(1)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(2)); engine.refresh("test"); engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID()); assertBusy(() -> { @@ -6060,8 +6123,9 @@ public void testStressShouldPeriodicallyFlush() throws Exception { final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); - if (rarely() && engine.getTranslog().shouldRollGeneration()) { - engine.rollTranslogGeneration(); + engine.ensureOpen(); + if (rarely() && engine.translogManager().getTranslog().shouldRollGeneration()) { + engine.translogManager().rollTranslogGeneration(); } if (rarely() || engine.shouldPeriodicallyFlush()) { engine.flush(); @@ -6282,8 +6346,9 @@ public void testTrimUnsafeCommits() throws Exception { } } globalCheckpoint.set(randomInt(maxSeqNo)); - engine.syncTranslog(); - minTranslogGen = engine.getTranslog().getMinFileGeneration(); + engine.translogManager().syncTranslog(); + engine.ensureOpen(); + minTranslogGen = engine.translogManager().getTranslog().getMinFileGeneration(); } store.trimUnsafeCommits(config.getTranslogConfig().getTranslogPath()); @@ -6484,7 +6549,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { } existingSeqNos.add(result.getSeqNo()); if (randomBoolean()) { - engine.syncTranslog(); // advance persisted local checkpoint + engine.translogManager().syncTranslog(); // advance persisted local checkpoint assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint()); globalCheckpoint.set( randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getPersistedCheckpoint()) @@ -6720,7 +6785,7 @@ public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { flushedOperations.add(op); applyOperation(engine, op); if (randomBoolean()) { - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); } if (randomInt(100) < 10) { @@ -6780,7 +6845,8 @@ public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { equalTo(seqNosInSafeCommit.contains(op.seqNo())) ); } - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertThat(getDocIds(engine, true), equalTo(docs)); } } @@ -6834,8 +6900,9 @@ public void testStoreHonorsLuceneVersion() throws IOException { public void testMaxSeqNoInCommitUserData() throws Exception { AtomicBoolean running = new AtomicBoolean(true); Thread rollTranslog = new Thread(() -> { - while (running.get() && engine.getTranslog().currentFileGeneration() < 500) { - engine.rollTranslogGeneration(); // make adding operations to translog slower + engine.ensureOpen(); + while (running.get() && engine.translogManager().getTranslog().currentFileGeneration() < 500) { + engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower } }); rollTranslog.start(); @@ -6974,7 +7041,7 @@ public void testRecoverFromLocalTranslog() throws Exception { for (Engine.Operation op : operations) { applyOperation(engine, op); if (randomBoolean()) { - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); } if (randomInt(100) < 10) { @@ -6989,21 +7056,23 @@ public void testRecoverFromLocalTranslog() throws Exception { } if (randomBoolean()) { // engine is flushed properly before shutting down. - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(); } docs = getDocIds(engine, true); } try (InternalEngine engine = new InternalEngine(config)) { + translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine); engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, 0); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) { + engine.ensureOpen(); assertThat( "engine should trim all unreferenced translog after recovery", - engine.getTranslog().getMinFileGeneration(), - equalTo(engine.getTranslog().currentFileGeneration()) + engine.translogManager().getTranslog().getMinFileGeneration(), + equalTo(engine.translogManager().getTranslog().currentFileGeneration()) ); } } @@ -7075,8 +7144,8 @@ public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws E ); } primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE)); - engine.rollTranslogGeneration(); - engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog + engine.translogManager().rollTranslogGeneration(); + engine.translogManager().trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(0)); assertNull(snapshot.next()); diff --git a/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java index e3117e179e7fa..d1a3097005e6c 100644 --- a/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java @@ -226,7 +226,7 @@ public void testSkipNonRootOfNestedDocuments() throws Exception { engine.refresh("test"); } if (rarely()) { - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } if (rarely()) { engine.flush(); @@ -300,11 +300,12 @@ class Follower extends Thread { this.leader = leader; this.isDone = isDone; this.readLatch = readLatch; + this.engine = createEngine(createStore(), createTempDir()); this.translogHandler = new TranslogHandler( xContentRegistry(), - IndexSettingsModule.newIndexSettings(shardId.getIndexName(), leader.engineConfig.getIndexSettings().getSettings()) + IndexSettingsModule.newIndexSettings(shardId.getIndexName(), leader.engineConfig.getIndexSettings().getSettings()), + engine ); - this.engine = createEngine(createStore(), createTempDir()); } void pullOperations(InternalEngine follower) throws IOException { @@ -315,7 +316,7 @@ void pullOperations(InternalEngine follower) throws IOException { long batchSize = randomLongBetween(0, 100); long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint); try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) { - translogHandler.run(follower, snapshot); + translogHandler.run(snapshot); } } } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 6aa00bb9312dd..6d3e54e0648f1 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -84,7 +84,8 @@ public void testEngineWritesOpsToTranslog() throws Exception { // recover a new engine from the nrtEngine's xlog. nrtEngine.syncTranslog(); try (InternalEngine engine = new InternalEngine(nrtEngine.config())) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + TranslogHandler translogHandler = createTranslogHandler(nrtEngine.config().getIndexSettings(), engine); + engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertEquals(getDocIds(engine, true), docs); } assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog()); @@ -144,7 +145,7 @@ public void testUpdateSegments() throws Exception { assertEquals( nrtEngine.getTranslog().getGeneration().translogFileGeneration, - engine.getTranslog().getGeneration().translogFileGeneration + engine.translogManager().getTranslog().getGeneration().translogFileGeneration ); try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) { diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java index e2f6fa9489885..1ab7e3d7bc093 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java @@ -145,7 +145,7 @@ public void testNoOpEngineStats() throws Exception { if (rarely()) { engine.flush(); } - engine.syncTranslog(); // advance persisted local checkpoint + engine.translogManager().syncTranslog(); // advance persisted local checkpoint globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); } @@ -154,7 +154,7 @@ public void testNoOpEngineStats() throws Exception { String delId = Integer.toString(i); Engine.DeleteResult result = engine.delete(new Engine.Delete(delId, newUid(delId), primaryTerm.get())); assertTrue(result.isFound()); - engine.syncTranslog(); // advance persisted local checkpoint + engine.translogManager().syncTranslog(); // advance persisted local checkpoint globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); deletions += 1; } @@ -217,20 +217,24 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { engine.flush(); } if (randomBoolean()) { - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } } // prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine. - final Translog.Snapshot snapshot = engine.getTranslog().newSnapshot(); + engine.ensureOpen(); + final Translog.Snapshot snapshot = engine.translogManager().getTranslog().newSnapshot(); engine.flush(true, true); engine.close(); final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); - assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps)); - noOpEngine.trimUnreferencedTranslogFiles(); - assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(0)); - assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - assertThat(noOpEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES)); + assertThat(noOpEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps)); + noOpEngine.translogManager().trimUnreferencedTranslogFiles(); + assertThat(noOpEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(), equalTo(0)); + assertThat(noOpEngine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0)); + assertThat( + noOpEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(), + equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES) + ); snapshot.close(); noOpEngine.close(); } diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java index 2106c5e1067fb..017c6610bfb29 100644 --- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java @@ -93,13 +93,13 @@ public void testReadOnlyEngine() throws Exception { } globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); } - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); engine.flush(); readOnlyEngine = new ReadOnlyEngine( engine.engineConfig, engine.getSeqNoStats(globalCheckpoint.get()), - engine.getTranslogStats(), + engine.translogManager().getTranslogStats(), false, Function.identity(), true @@ -139,7 +139,9 @@ public void testReadOnlyEngine() throws Exception { } // Close and reopen the main engine try (InternalEngine recoveringEngine = new InternalEngine(config)) { - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), recoveringEngine); + recoveringEngine.translogManager() + .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); @@ -178,7 +180,7 @@ public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException { ); maxSeqNo = engine.getProcessedLocalCheckpoint(); } - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(engine.getPersistedLocalCheckpoint() - 1); engine.flushAndClose(); @@ -274,12 +276,13 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException { } globalCheckpoint.set(i); } - engine.syncTranslog(); + engine.translogManager().syncTranslog(); engine.flushAndClose(); } try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) { - final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong()); + final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings(), readOnlyEngine); + readOnlyEngine.translogManager() + .recoverFromTranslog(translogHandler, readOnlyEngine.getProcessedLocalCheckpoint(), randomNonNegativeLong()); assertThat(translogHandler.appliedOperations(), equalTo(0L)); } @@ -324,23 +327,26 @@ public void testTranslogStats() throws IOException { } assertThat( - engine.getTranslogStats().estimatedNumberOfOperations(), + engine.translogManager().getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? uncommittedDocs : numDocs) ); - assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs)); - assertThat(engine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); - assertThat(engine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L)); - assertThat(engine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs)); + assertThat(engine.translogManager().getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L)); + assertThat(engine.translogManager().getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L)); engine.flush(true, true); } try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) { - assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); - assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); - assertThat(readOnlyEngine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L)); - assertThat(readOnlyEngine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L)); + assertThat( + readOnlyEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(), + equalTo(softDeletesEnabled ? 0 : numDocs) + ); + assertThat(readOnlyEngine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0)); + assertThat(readOnlyEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); + assertThat(readOnlyEngine.translogManager().getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L)); + assertThat(readOnlyEngine.translogManager().getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L)); } } } diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java index eea316d9a9370..62f9be84179ba 100644 --- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java @@ -100,7 +100,7 @@ */ public class RefreshListenersTests extends OpenSearchTestCase { private RefreshListeners listeners; - private Engine engine; + private InternalEngine engine; private volatile int maxListeners; private ThreadPool threadPool; private Store store; @@ -173,8 +173,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { EngineTestCase.tombstoneDocSupplier() ); engine = new InternalEngine(config); - engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); - listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); + engine.translogManager().recoverFromTranslog((s) -> 0, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); + listeners.setCurrentRefreshLocationSupplier(engine.translogManager()::getTranslogLastWriteLocation); } @After diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index 4db792b4a3fc2..9cc58e250d74e 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -56,7 +56,7 @@ public void testRecoveryFromTranslog() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.getTranslog().add(new Translog.Index(index, indexResult)); translogManager.rollTranslogGeneration(); } long maxSeqNo = tracker.getMaxSeqNo(); @@ -64,7 +64,7 @@ public void testRecoveryFromTranslog() throws IOException { assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); translogManager.syncTranslog(); - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -103,7 +103,7 @@ public void onBeginTranslogRecovery() { assertTrue(onTranslogRecoveryInvoked.get()); } finally { - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); } } @@ -131,7 +131,7 @@ public void testTranslogRollsGeneration() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.getTranslog().add(new Translog.Index(index, indexResult)); translogManager.rollTranslogGeneration(); } long maxSeqNo = tracker.getMaxSeqNo(); @@ -139,7 +139,7 @@ public void testTranslogRollsGeneration() throws IOException { assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); translogManager.syncTranslog(); - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -164,7 +164,7 @@ public void testTranslogRollsGeneration() throws IOException { assertEquals(maxSeqNo + 1, opsRecovered.get()); assertEquals(maxSeqNo + 1, opsRecoveredFromTranslog); } finally { - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); } } @@ -192,7 +192,7 @@ public void testTrimOperationsFromTranslog() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.getTranslog().add(new Translog.Index(index, indexResult)); } long maxSeqNo = tracker.getMaxSeqNo(); assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); @@ -202,7 +202,7 @@ public void testTrimOperationsFromTranslog() throws IOException { translogManager.rollTranslogGeneration(); translogManager.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -227,7 +227,7 @@ public void testTrimOperationsFromTranslog() throws IOException { assertEquals(0, opsRecovered.get()); assertEquals(0, opsRecoveredFromTranslog); } finally { - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); } } @@ -253,7 +253,7 @@ public void testTranslogSync() throws IOException { @Override public void onAfterTranslogSync() { try { - translogManagerAtomicReference.get().getTranslog(false).trimUnreferencedReaders(); + translogManagerAtomicReference.get().getTranslog().trimUnreferencedReaders(); syncListenerInvoked.set(true); } catch (IOException ex) { fail("Failed due to " + ex); @@ -265,15 +265,15 @@ public void onAfterTranslogSync() { translogManagerAtomicReference.set(translogManager); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), 1, false); - translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.getTranslog().add(new Translog.Index(index, indexResult)); translogManager.syncTranslog(); - assertThat(translogManager.getTranslog(true).currentFileGeneration(), equalTo(2L)); - assertThat(translogManager.getTranslog(true).getMinFileGeneration(), equalTo(2L)); + assertThat(translogManager.getTranslog().currentFileGeneration(), equalTo(2L)); + assertThat(translogManager.getTranslog().getMinFileGeneration(), equalTo(2L)); assertTrue(syncListenerInvoked.get()); } finally { - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); } } } diff --git a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java index 751ee67236703..1f28e32a6dbec 100644 --- a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java @@ -8,7 +8,8 @@ package org.opensearch.index.translog.listener; -import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.index.Index; +import org.opensearch.index.shard.ShardId; import org.opensearch.test.OpenSearchTestCase; import java.lang.reflect.Proxy; @@ -26,7 +27,6 @@ public void testCompositeTranslogEventListener() { AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger(); AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger(); AtomicInteger onFailureInvoked = new AtomicInteger(); - AtomicInteger onTragicFailureInvoked = new AtomicInteger(); TranslogEventListener listener = new TranslogEventListener() { @Override @@ -48,27 +48,23 @@ public void onBeginTranslogRecovery() { public void onFailure(String reason, Exception ex) { onFailureInvoked.incrementAndGet(); } - - @Override - public void onTragicFailure(AlreadyClosedException ex) { - onTragicFailureInvoked.incrementAndGet(); - } }; final List translogEventListeners = new ArrayList<>(Arrays.asList(listener, listener)); Collections.shuffle(translogEventListeners, random()); - TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); + TranslogEventListener compositeListener = new CompositeTranslogEventListener( + translogEventListeners, + new ShardId(new Index("indexName", "indexUuid"), 123) + ); compositeListener.onAfterTranslogRecovery(); compositeListener.onAfterTranslogSync(); compositeListener.onBeginTranslogRecovery(); compositeListener.onFailure("reason", new RuntimeException("reason")); - compositeListener.onTragicFailure(new AlreadyClosedException("reason")); assertEquals(2, onBeginTranslogRecoveryInvoked.get()); assertEquals(2, onTranslogRecoveryInvoked.get()); assertEquals(2, onTranslogSyncInvoked.get()); assertEquals(2, onFailureInvoked.get()); - assertEquals(2, onTragicFailureInvoked.get()); } public void testCompositeTranslogEventListenerOnExceptions() { @@ -76,7 +72,6 @@ public void testCompositeTranslogEventListenerOnExceptions() { AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger(); AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger(); AtomicInteger onFailureInvoked = new AtomicInteger(); - AtomicInteger onTragicFailureInvoked = new AtomicInteger(); TranslogEventListener listener = new TranslogEventListener() { @Override @@ -98,11 +93,6 @@ public void onBeginTranslogRecovery() { public void onFailure(String reason, Exception ex) { onFailureInvoked.incrementAndGet(); } - - @Override - public void onTragicFailure(AlreadyClosedException ex) { - onTragicFailureInvoked.incrementAndGet(); - } }; TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance( @@ -113,18 +103,18 @@ public void onTragicFailure(AlreadyClosedException ex) { final List translogEventListeners = new LinkedList<>(Arrays.asList(listener, throwingListener, listener)); Collections.shuffle(translogEventListeners, random()); - TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); + TranslogEventListener compositeListener = new CompositeTranslogEventListener( + translogEventListeners, + new ShardId(new Index("indexName", "indexUuid"), 123) + ); expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogRecovery()); expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogSync()); expectThrows(RuntimeException.class, () -> compositeListener.onBeginTranslogRecovery()); expectThrows(RuntimeException.class, () -> compositeListener.onFailure("reason", new RuntimeException("reason"))); - expectThrows(RuntimeException.class, () -> compositeListener.onTragicFailure(new AlreadyClosedException("reason"))); assertEquals(2, onBeginTranslogRecoveryInvoked.get()); assertEquals(2, onTranslogRecoveryInvoked.get()); assertEquals(2, onTranslogSyncInvoked.get()); assertEquals(2, onFailureInvoked.get()); - assertEquals(2, onTragicFailureInvoked.get()); - } } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 66c697d83510b..a8b5b03e03877 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -113,6 +113,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; +import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.test.DummyShardLock; @@ -221,7 +222,6 @@ public void setUp() throws Exception { Lucene.cleanLuceneIndex(store.directory()); Lucene.cleanLuceneIndex(storeReplica.directory()); primaryTranslogDir = createTempDir("translog-primary"); - translogHandler = createTranslogHandler(defaultSettings); engine = createEngine(store, primaryTranslogDir); LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); @@ -328,10 +328,12 @@ public void tearDown() throws Exception { super.tearDown(); try { if (engine != null && engine.isClosed.get() == false) { - assertEngineCleanedUp(engine, engine.getTranslog()); + engine.ensureOpen(); + assertEngineCleanedUp(engine, engine.translogManager().getTranslog()); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { - assertEngineCleanedUp(replicaEngine, replicaEngine.getTranslog()); + replicaEngine.ensureOpen(); + assertEngineCleanedUp(replicaEngine, replicaEngine.translogManager().getTranslog()); } } finally { IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool)); @@ -339,13 +341,11 @@ public void tearDown() throws Exception { } protected void assertEngineCleanedUp(Engine engine, Translog translog) throws Exception { - if (engine.isClosed.get() == false) { - translog.getDeletionPolicy().assertNoOpenTranslogRefs(); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); - assertNoInFlightDocuments(engine); - assertMaxSeqNoInCommitUserData(engine); - assertAtMostOneLuceneDocumentPerSequenceNumber(engine); - } + translog.getDeletionPolicy().assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); + assertNoInFlightDocuments(engine); + assertMaxSeqNoInCommitUserData(engine); + assertAtMostOneLuceneDocumentPerSequenceNumber(engine); } protected static ParseContext.Document testDocumentWithTextField() { @@ -524,8 +524,8 @@ protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSup ); } - protected TranslogHandler createTranslogHandler(IndexSettings indexSettings) { - return new TranslogHandler(xContentRegistry(), indexSettings); + protected TranslogHandler createTranslogHandler(IndexSettings indexSettings, Engine engine) { + return new TranslogHandler(xContentRegistry(), indexSettings, engine); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { @@ -662,12 +662,13 @@ protected InternalEngine createEngine( } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + translogHandler = createTranslogHandler(config.getIndexSettings(), internalEngine); + internalEngine.translogManager().recoverFromTranslog(translogHandler, internalEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); return internalEngine; } public static InternalEngine createEngine(EngineConfig engineConfig, int maxDocs) { - return new InternalEngine(engineConfig, maxDocs, LocalCheckpointTracker::new); + return new InternalEngine(engineConfig, maxDocs, LocalCheckpointTracker::new, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER); } @FunctionalInterface @@ -1479,7 +1480,8 @@ public static MapperService createMapperService() throws IOException { public static Translog getTranslog(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; - return internalEngine.getTranslog(); + internalEngine.ensureOpen(); + return internalEngine.translogManager().getTranslog(); } /** diff --git a/test/framework/src/main/java/org/opensearch/index/engine/InternalTestEngine.java b/test/framework/src/main/java/org/opensearch/index/engine/InternalTestEngine.java index 2c22c391ea3a3..20767c3b029ae 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/InternalTestEngine.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/InternalTestEngine.java @@ -35,6 +35,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.listener.TranslogEventListener; import java.io.IOException; import java.util.Map; @@ -55,7 +56,7 @@ class InternalTestEngine extends InternalEngine { int maxDocs, BiFunction localCheckpointTrackerSupplier ) { - super(engineConfig, maxDocs, localCheckpointTrackerSupplier); + super(engineConfig, maxDocs, localCheckpointTrackerSupplier, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER); } @Override diff --git a/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java index e1f2357aa2400..e86f9d371608e 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java @@ -50,6 +50,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogRecoveryRunner; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.mapper.MapperRegistry; @@ -61,17 +62,20 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -public class TranslogHandler implements Engine.TranslogRecoveryRunner { +public class TranslogHandler implements TranslogRecoveryRunner { private final MapperService mapperService; private final AtomicLong appliedOperations = new AtomicLong(); + private final Engine engine; + long appliedOperations() { return appliedOperations.get(); } - public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { + public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings, Engine engine) { + this.engine = engine; Map analyzers = new HashMap<>(); analyzers.put(AnalysisRegistry.DEFAULT_ANALYZER_NAME, new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer())); IndexAnalyzers indexAnalyzers = new IndexAnalyzers(analyzers, emptyMap(), emptyMap()); @@ -112,7 +116,7 @@ private void applyOperation(Engine engine, Engine.Operation operation) throws IO } @Override - public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { + public int run(Translog.Snapshot snapshot) throws IOException { int opsRecovered = 0; Translog.Operation operation; while ((operation = snapshot.next()) != null) {