diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 333dd769eaf68..a56335d7599e3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -298,7 +298,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Lon throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); } if (generation.translogUUID == null) { - throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); + throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); } } final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier); @@ -1179,12 +1179,12 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti try { translog.prepareCommit(); logger.trace("starting commit for flush; commitTranslog=true"); - commitIndexWriter(indexWriter, translog, null); + final long committedGeneration = commitIndexWriter(indexWriter, translog, null); logger.trace("finished commit for flush"); // we need to refresh in order to clear older version values refresh("version_table_flush"); // after refresh documents can be retrieved from the index so we can now commit the translog - translog.commit(); + translog.commit(committedGeneration); } catch (Exception e) { throw new FlushFailedEngineException(shardId, e); } @@ -1680,55 +1680,65 @@ protected void doRun() throws Exception { } } - private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { + /** + * Commits the specified index writer. + * + * @param writer the index writer to commit + * @param translog the translog + * @param syncId the sync flush ID ({@code null} if not committing a synced flush) + * @return the minimum translog generation for the local checkpoint committed with the specified index writer + * @throws IOException if an I/O exception occurs committing the specfied writer + */ + private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { ensureCanFlush(); try { - Translog.TranslogGeneration translogGeneration = translog.getGeneration(); - - final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration); + final long localCheckpoint = seqNoService().getLocalCheckpoint(); + final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); + final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); final String translogUUID = translogGeneration.translogUUID; - final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint()); + final String localCheckpointValue = Long.toString(localCheckpoint); writer.setLiveCommitData(() -> { /* * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes - * segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want + * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want * the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the * risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently - * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the - * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation - * of the commit data iterator (which occurs after all documents have been flushed to Lucene). + * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the + * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time + * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(6); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen); + final Map commitData = new HashMap<>(5); + commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); - if (logger.isTraceEnabled()) { - logger.trace("committing writer with commit data [{}]", commitData); - } + logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); writer.commit(); - } catch (Exception ex) { + return translogGeneration.translogFileGeneration; + } catch (final Exception ex) { try { failEngine("lucene commit failed", ex); - } catch (Exception inner) { + } catch (final Exception inner) { ex.addSuppressed(inner); } throw ex; - } catch (AssertionError e) { - // IndexWriter throws AssertionError on commit, if asserts are enabled, if any files don't exist, but tests that - // randomly throw FNFE/NSFE can also hit this: + } catch (final AssertionError e) { + /* + * If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly + * throw FileNotFoundException or NoSuchFileException can also hit this. + */ if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) { - EngineException engineException = new EngineException(shardId, "failed to commit engine", e); + final EngineException engineException = new EngineException(shardId, "failed to commit engine", e); try { failEngine("lucene commit failed", engineException); - } catch (Exception inner) { + } catch (final Exception inner) { engineException.addSuppressed(inner); } throw engineException; @@ -1812,7 +1822,7 @@ public boolean isRecovering() { * Gets the commit data from {@link IndexWriter} as a map. */ private static Map commitDataAsMap(final IndexWriter indexWriter) { - Map commitData = new HashMap<>(6); + Map commitData = new HashMap<>(5); for (Map.Entry entry : indexWriter.getLiveCommitData()) { commitData.put(entry.getKey(), entry.getValue()); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index d9a8cc408f822..014500230a404 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -85,14 +85,14 @@ * When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations. * The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against * the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next - * generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are - * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case + * generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are + * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case * the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than * one translog file present. Such an uncommitted translog file always has a translog-${gen}.ckp associated with it which is an fsynced copy of the it's last translog.ckp such that in * disaster recovery last fsynced offsets, number of operation etc. are still preserved. *

*/ -public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable, TwoPhaseCommit { +public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable { /* * TODO @@ -804,6 +804,8 @@ public static Type fromId(byte id) { long seqNo(); + long primaryTerm(); + /** * Reads the type and the operation from the given stream. The operation must be written with * {@link Operation#writeType(Operation, StreamOutput)} @@ -953,6 +955,7 @@ public long seqNo() { return seqNo; } + @Override public long primaryTerm() { return primaryTerm; } @@ -1104,6 +1107,7 @@ public long seqNo() { return seqNo; } + @Override public long primaryTerm() { return primaryTerm; } @@ -1180,6 +1184,7 @@ public long seqNo() { return seqNo; } + @Override public long primaryTerm() { return primaryTerm; } @@ -1347,6 +1352,31 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl out.writeInt((int) checksum); } + /** + * Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if + * there is no generation that could any such sequence number. + * + * @param seqNo the sequence number + * @return the minimum generation for the sequence number + */ + public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { + try (ReleasableLock ignored = writeLock.acquire()) { + /* + * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the + * local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will + * be the current translog generation as we do not need any prior generations to have a complete history up to the current local + * checkpoint. + */ + long minTranslogFileGeneration = this.currentFileGeneration(); + for (final TranslogReader reader : readers) { + if (seqNo <= reader.getCheckpoint().maxSeqNo) { + minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); + } + } + return new TranslogGeneration(translogUUID, minTranslogFileGeneration); + } + } + /** * Roll the current translog generation into a new generation. This does not commit the * translog. @@ -1375,27 +1405,38 @@ public void rollGeneration() throws IOException { } } - @Override - public long prepareCommit() throws IOException { + /** + * Prepares a translog commit by setting the current committing generation and rolling the translog generation. + * + * @throws IOException if an I/O exception occurred while rolling the translog generation + */ + public void prepareCommit() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); if (currentCommittingGeneration != NOT_SET_GENERATION) { - final String message = String.format( - Locale.ROOT, - "already committing a translog with generation [%d]", - currentCommittingGeneration); + final String message = + String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration); throw new IllegalStateException(message); } currentCommittingGeneration = current.getGeneration(); rollGeneration(); } - return 0; } - @Override - public long commit() throws IOException { + /** + * Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation + * will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved. + * + * If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog + * generation to be rolled. + * + * @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations + * @throws IOException if an I/O exception occurred preparing the translog commit + */ + public void commit(final long committedGeneration) throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); + assert assertCommittedGenerationIsInValidRange(committedGeneration); if (currentCommittingGeneration == NOT_SET_GENERATION) { prepareCommit(); } @@ -1403,26 +1444,39 @@ public long commit() throws IOException { assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration) : "readers missing committing generation [" + currentCommittingGeneration + "]"; // set the last committed generation otherwise old files will not be cleaned up - lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1; + lastCommittedTranslogFileGeneration = committedGeneration; currentCommittingGeneration = NOT_SET_GENERATION; trimUnreferencedReaders(); } - return 0; } + private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) { + assert committedGeneration <= current.generation + : "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]"; + final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE); + assert committedGeneration >= min + : "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]"; + return true; + } + + /** + * Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views + * and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}. + */ void trimUnreferencedReaders() { try (ReleasableLock ignored = writeLock.acquire()) { if (closed.get()) { - // we're shutdown potentially on some tragic event - don't delete anything + // we're shutdown potentially on some tragic event, don't delete anything return; } - long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE); - minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen); - final long finalMinReferencedGen = minReferencedGen; - List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList()); + long minReferencedGen = Math.min( + lastCommittedTranslogFileGeneration, + outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE)); + final List unreferenced = + readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList()); for (final TranslogReader unreferencedReader : unreferenced) { - Path translogPath = unreferencedReader.path(); - logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); + final Path translogPath = unreferencedReader.path(); + logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath); IOUtils.closeWhileHandlingException(unreferencedReader); IOUtils.deleteFilesIgnoringExceptions(translogPath, translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration()))); @@ -1442,13 +1496,6 @@ void closeFilesIfNoPendingViews() throws IOException { } } - - @Override - public void rollback() throws IOException { - ensureOpen(); - close(); - } - /** * References a transaction log generation */ diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c78374a0e9e1e..6c9ca6adc963b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -150,6 +150,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -165,6 +166,8 @@ import java.util.function.BiFunction; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.LongStream; import static java.util.Collections.emptyMap; import static java.util.Collections.shuffle; @@ -833,6 +836,58 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti } } + public void testTranslogRecoveryWithMultipleGenerations() throws IOException { + final int docs = randomIntBetween(1, 4096); + final List seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toList()); + Randomness.shuffle(seqNos); + engine.close(); + Engine initialEngine = null; + try { + final AtomicInteger counter = new AtomicInteger(); + initialEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG)) { + @Override + public SequenceNumbersService seqNoService() { + return new SequenceNumbersService( + engine.shardId, + engine.config().getIndexSettings(), + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.UNASSIGNED_SEQ_NO) { + @Override + public long generateSeqNo() { + return seqNos.get(counter.getAndIncrement()); + } + }; + } + }; + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null); + initialEngine.index(indexForDoc(doc)); + if (rarely()) { + initialEngine.getTranslog().rollGeneration(); + } else if (rarely()) { + initialEngine.flush(); + } + } + } finally { + IOUtils.close(initialEngine); + } + + Engine recoveringEngine = null; + try { + recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine.recoverFromTranslog(); + try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { + TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); + assertEquals(docs, topDocs.totalHits); + } + } finally { + IOUtils.close(recoveringEngine); + } + + } + public void testConcurrentGetAndFlush() throws Exception { ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); @@ -3357,48 +3412,68 @@ public void testSequenceIDs() throws Exception { searchResult.close(); } + /** + * A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier + * and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the + * value of {@code expectedLocalCheckpoint} is set accordingly. + * + * @param latchReference to latch the thread for the purpose of stalling + * @param barrier to signal the thread has generated a new sequence number + * @param stall whether or not the thread should stall + * @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence + * number + * @return a sequence number service + */ + private SequenceNumbersService getStallingSeqNoService( + final AtomicReference latchReference, + final CyclicBarrier barrier, + final AtomicBoolean stall, + final AtomicLong expectedLocalCheckpoint) { + return new SequenceNumbersService( + shardId, + defaultSettings, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.UNASSIGNED_SEQ_NO) { + @Override + public long generateSeqNo() { + final long seqNo = super.generateSeqNo(); + final CountDownLatch latch = latchReference.get(); + if (stall.get()) { + try { + barrier.await(); + latch.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + if (expectedLocalCheckpoint.get() + 1 == seqNo) { + expectedLocalCheckpoint.set(seqNo); + } + } + return seqNo; + } + }; + } + public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws BrokenBarrierException, InterruptedException, IOException { engine.close(); final int docs = randomIntBetween(1, 32); InternalEngine initialEngine = null; try { - final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference latchReference = new AtomicReference<>(new CountDownLatch(1)); final CyclicBarrier barrier = new CyclicBarrier(2); - final AtomicBoolean skip = new AtomicBoolean(); + final AtomicBoolean stall = new AtomicBoolean(); final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); final List threads = new ArrayList<>(); - final SequenceNumbersService seqNoService = - new SequenceNumbersService( - shardId, - defaultSettings, - SequenceNumbersService.NO_OPS_PERFORMED, - SequenceNumbersService.NO_OPS_PERFORMED, - SequenceNumbersService.UNASSIGNED_SEQ_NO) { - @Override - public long generateSeqNo() { - final long seqNo = super.generateSeqNo(); - if (skip.get()) { - try { - barrier.await(); - latch.await(); - } catch (BrokenBarrierException | InterruptedException e) { - throw new RuntimeException(e); - } - } else { - if (expectedLocalCheckpoint.get() + 1 == seqNo) { - expectedLocalCheckpoint.set(seqNo); - } - } - return seqNo; - } - }; + final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService); final InternalEngine finalInitialEngine = initialEngine; for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null); - skip.set(randomBoolean()); + stall.set(randomBoolean()); final Thread thread = new Thread(() -> { try { finalInitialEngine.index(indexForDoc(doc)); @@ -3407,7 +3482,7 @@ public long generateSeqNo() { } }); thread.start(); - if (skip.get()) { + if (stall.get()) { threads.add(thread); barrier.await(); } else { @@ -3419,7 +3494,7 @@ public long generateSeqNo() { assertThat(initialEngine.seqNoService().getMaxSeqNo(), equalTo((long) (docs - 1))); initialEngine.flush(true, true); - latch.countDown(); + latchReference.get().countDown(); for (final Thread thread : threads) { thread.join(); } @@ -3594,6 +3669,78 @@ public long generateSeqNo() { } } + public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierException, InterruptedException { + engine.close(); + final int numberOfTriplets = randomIntBetween(1, 32); + InternalEngine actualEngine = null; + try { + final AtomicReference latchReference = new AtomicReference<>(); + final CyclicBarrier barrier = new CyclicBarrier(2); + final AtomicBoolean stall = new AtomicBoolean(); + final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final Map threads = new LinkedHashMap<>(); + final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); + actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService); + final InternalEngine finalActualEngine = actualEngine; + final Translog translog = finalActualEngine.getTranslog(); + final long generation = finalActualEngine.getTranslog().currentFileGeneration(); + for (int i = 0; i < numberOfTriplets; i++) { + /* + * Index three documents with the first and last landing in the same generation and the middle document being stalled until + * a later generation. + */ + stall.set(false); + index(finalActualEngine, 3 * i); + + final CountDownLatch latch = new CountDownLatch(1); + latchReference.set(latch); + final int skipId = 3 * i + 1; + stall.set(true); + final Thread thread = new Thread(() -> { + try { + index(finalActualEngine, skipId); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + thread.start(); + threads.put(thread, latch); + barrier.await(); + + stall.set(false); + index(finalActualEngine, 3 * i + 2); + finalActualEngine.flush(); + + /* + * This sequence number landed in the last generation, but the lower and upper bounds for an earlier generation straddle + * this sequence number. + */ + assertThat(translog.getMinGenerationForSeqNo(3 * i + 1).translogFileGeneration, equalTo(i + generation)); + } + + int i = 0; + for (final Map.Entry entry : threads.entrySet()) { + final Map userData = finalActualEngine.commitStats().getUserData(); + assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i))); + assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation))); + entry.getValue().countDown(); + entry.getKey().join(); + finalActualEngine.flush(); + i++; + } + + } finally { + IOUtils.close(actualEngine); + } + } + + private void index(final InternalEngine engine, final int id) throws IOException { + final String docId = Integer.toString(id); + final ParsedDocument doc = + testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + } + /** * Return a tuple representing the sequence ID for the given {@code Get} * operation. The first value in the tuple is the sequence number, the diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 6b2aa5e59215e..fa0e1259f0fbc 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -36,8 +36,10 @@ import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -84,6 +86,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -101,6 +104,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.LongStream; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.hamcrest.Matchers.containsString; @@ -124,7 +128,7 @@ protected void afterIfSuccessful() throws Exception { if (translog.isOpen()) { if (translog.currentFileGeneration() > 1) { - translog.commit(); + translog.commit(translog.currentFileGeneration()); assertFileDeleted(translog, translog.currentFileGeneration() - 1); } translog.close(); @@ -287,7 +291,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(ops.size())); - translog.commit(); + translog.commit(translog.currentFileGeneration()); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); assertThat(snapshot.totalOperations(), equalTo(0)); @@ -373,7 +377,7 @@ public void testStats() throws IOException { } } - translog.commit(); + translog.commit(translog.currentFileGeneration()); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(0L)); @@ -446,7 +450,7 @@ public void testSnapshotWithNewTranslog() throws IOException { try (Translog.View view = translog.newView()) { Translog.Snapshot snapshot2 = translog.newSnapshot(); - translog.commit(); + translog.commit(translog.currentFileGeneration()); assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } @@ -821,7 +825,7 @@ protected void doRun() throws Exception { break; } } - translog.commit(); + translog.commit(translog.currentFileGeneration()); } } finally { run.set(false); @@ -858,7 +862,7 @@ public void testSyncUpTo() throws IOException { assertTrue("we only synced a previous operation yet", translog.syncNeeded()); } if (rarely()) { - translog.commit(); + translog.commit(translog.currentFileGeneration()); assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } @@ -878,7 +882,7 @@ public void testSyncUpToStream() throws IOException { ArrayList locations = new ArrayList<>(); for (int op = 0; op < translogOperations; op++) { if (rarely()) { - translog.commit(); // do this first so that there is at least one pending tlog entry + translog.commit(translog.currentFileGeneration()); // do this first so that there is at least one pending tlog entry } final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); locations.add(location); @@ -889,7 +893,7 @@ public void testSyncUpToStream() throws IOException { assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream())); assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced } else if (rarely()) { - translog.commit(); + translog.commit(translog.currentFileGeneration()); assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } else { @@ -909,7 +913,7 @@ public void testLocationComparison() throws IOException { for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); if (rarely() && translogOperations > op + 1) { - translog.commit(); + translog.commit(translog.currentFileGeneration()); } } Collections.shuffle(locations, random()); @@ -1074,7 +1078,7 @@ public void testBasicRecovery() throws IOException { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); if (commit && op < translogOperations - 1) { - translog.commit(); + translog.commit(translog.currentFileGeneration()); minUncommittedOp = op + 1; translogGeneration = translog.getGeneration(); } @@ -1300,7 +1304,7 @@ public void testOpenForeignTranslog() throws IOException { for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { - translog.commit(); + translog.commit(translog.currentFileGeneration()); firstUncommitted = op + 1; } } @@ -1483,7 +1487,7 @@ public void testFailFlush() throws IOException { } try { - translog.commit(); + translog.commit(translog.currentFileGeneration()); fail("already closed"); } catch (AlreadyClosedException ex) { assertNotNull(ex.getCause()); @@ -1930,7 +1934,7 @@ public void testWithRandomException() throws IOException { if (randomBoolean()) { failableTLog.prepareCommit(); } - failableTLog.commit(); + failableTLog.commit(translog.currentFileGeneration()); syncedDocs.clear(); } } @@ -2110,12 +2114,13 @@ public void testRollGeneration() throws IOException { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); } - translog.commit(); + translog.commit(generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1)); assertThat(translog.totalOperations(), equalTo(0)); - for (int i = 0; i <= rolls; i++) { + for (int i = 0; i < rolls; i++) { assertFileDeleted(translog, generation + i); } + assertFileIsPresent(translog, generation + rolls); assertFileIsPresent(translog, generation + rolls + 1); } @@ -2167,7 +2172,7 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException } } - translog.commit(); + translog.commit(generation + rollsBefore + 1); for (int i = 0; i <= rollsBefore; i++) { assertFileDeleted(translog, generation + i); @@ -2178,4 +2183,130 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException } + public void testMinGenerationForSeqNo() throws IOException { + final long initialGeneration = translog.getGeneration().translogFileGeneration; + final int operations = randomIntBetween(1, 4096); + final List shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList()); + Randomness.shuffle(shuffledSeqNos); + final List> seqNos = new ArrayList<>(); + final Map terms = new HashMap<>(); + for (final Long seqNo : shuffledSeqNos) { + seqNos.add(Tuple.tuple(seqNo, terms.computeIfAbsent(seqNo, k -> 0L))); + Long repeatingTermSeqNo = randomFrom(seqNos.stream().map(Tuple::v1).collect(Collectors.toList())); + seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.computeIfPresent(repeatingTermSeqNo, (s, t) -> t + 1))); + } + + for (final Tuple tuple : seqNos) { + translog.add(new Translog.NoOp(tuple.v1(), tuple.v2(), "test")); + if (rarely()) { + translog.rollGeneration(); + } + } + + Map>> generations = new HashMap<>(); + + translog.commit(initialGeneration); + for (long seqNo = 0; seqNo < operations; seqNo++) { + final Set> seenSeqNos = new HashSet<>(); + final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration; + for (long g = generation; g < translog.currentFileGeneration(); g++) { + if (!generations.containsKey(g)) { + final Set> generationSeenSeqNos = new HashSet<>(); + final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g))); + try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) { + Translog.Snapshot snapshot = reader.newSnapshot(); + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + generationSeenSeqNos.add(Tuple.tuple(operation.seqNo(), operation.primaryTerm())); + } + } + generations.put(g, generationSeenSeqNos); + + } + seenSeqNos.addAll(generations.get(g)); + } + + final long seqNoLowerBound = seqNo; + final Set> expected = seqNos.stream().filter(t -> t.v1() >= seqNoLowerBound).collect(Collectors.toSet()); + seenSeqNos.retainAll(expected); + assertThat(seenSeqNos, equalTo(expected)); + } + } + + public void testSimpleCommit() throws IOException { + final int operations = randomIntBetween(1, 4096); + long seqNo = 0; + for (int i = 0; i < operations; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test'")); + if (rarely()) { + translog.rollGeneration(); + } + } + + final long generation = + randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration())); + translog.commit(generation); + for (long i = 0; i < generation; i++) { + assertFileDeleted(translog, i); + } + for (long i = generation; i <= translog.currentFileGeneration(); i++) { + assertFileIsPresent(translog, i); + } + } + + public void testPrepareCommitAndCommit() throws IOException { + final int operations = randomIntBetween(1, 4096); + long seqNo = 0; + long last = -1; + for (int i = 0; i < operations; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + if (rarely()) { + final long generation = translog.currentFileGeneration(); + translog.prepareCommit(); + if (rarely()) { + // simulate generation filling up and rolling between preparing the commit and committing + translog.rollGeneration(); + } + final int committedGeneration = randomIntBetween(Math.max(1, Math.toIntExact(last)), Math.toIntExact(generation)); + translog.commit(committedGeneration); + last = committedGeneration; + for (long g = 0; i < generation; g++) { + assertFileDeleted(translog, g); + } + for (long g = generation; g < translog.currentFileGeneration(); g++) { + assertFileIsPresent(translog, g); + } + } + } + } + + public void testCommitWithOpenView() throws IOException { + final int operations = randomIntBetween(1, 4096); + long seqNo = 0; + long lastCommittedGeneration = -1; + for (int i = 0; i < operations; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + if (rarely()) { + try (Translog.View ignored = translog.newView()) { + final long viewGeneration = lastCommittedGeneration; + translog.prepareCommit(); + final long committedGeneration = randomIntBetween( + Math.max(1, Math.toIntExact(lastCommittedGeneration)), + Math.toIntExact(translog.currentFileGeneration())); + translog.commit(committedGeneration); + lastCommittedGeneration = committedGeneration; + // with an open view, committing should preserve generations back to the last committed generation + for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) { + assertFileDeleted(translog, g); + } + // the view generation could be -1 if no commit has been performed + final long max = Math.max(1, Math.min(lastCommittedGeneration, viewGeneration)); + for (long g = max; g < translog.currentFileGeneration(); g++) { + assertFileIsPresent(translog, g); + } + } + } + } + } + }