diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 333dd769eaf68..a56335d7599e3 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -298,7 +298,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Lon
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
}
if (generation.translogUUID == null) {
- throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
+ throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
@@ -1179,12 +1179,12 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
try {
translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true");
- commitIndexWriter(indexWriter, translog, null);
+ final long committedGeneration = commitIndexWriter(indexWriter, translog, null);
logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values
refresh("version_table_flush");
// after refresh documents can be retrieved from the index so we can now commit the translog
- translog.commit();
+ translog.commit(committedGeneration);
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
@@ -1680,55 +1680,65 @@ protected void doRun() throws Exception {
}
}
- private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
+ /**
+ * Commits the specified index writer.
+ *
+ * @param writer the index writer to commit
+ * @param translog the translog
+ * @param syncId the sync flush ID ({@code null} if not committing a synced flush)
+ * @return the minimum translog generation for the local checkpoint committed with the specified index writer
+ * @throws IOException if an I/O exception occurs committing the specfied writer
+ */
+ private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
- Translog.TranslogGeneration translogGeneration = translog.getGeneration();
-
- final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
+ final long localCheckpoint = seqNoService().getLocalCheckpoint();
+ final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
+ final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
- final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
+ final String localCheckpointValue = Long.toString(localCheckpoint);
writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
- * segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
+ * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
- * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
- * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
- * of the commit data iterator (which occurs after all documents have been flushed to Lucene).
+ * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
+ * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
+ * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
- final Map commitData = new HashMap<>(6);
- commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
+ final Map commitData = new HashMap<>(5);
+ commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
- commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
+ commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
- if (logger.isTraceEnabled()) {
- logger.trace("committing writer with commit data [{}]", commitData);
- }
+ logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
writer.commit();
- } catch (Exception ex) {
+ return translogGeneration.translogFileGeneration;
+ } catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
- } catch (Exception inner) {
+ } catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
- } catch (AssertionError e) {
- // IndexWriter throws AssertionError on commit, if asserts are enabled, if any files don't exist, but tests that
- // randomly throw FNFE/NSFE can also hit this:
+ } catch (final AssertionError e) {
+ /*
+ * If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
+ * throw FileNotFoundException or NoSuchFileException can also hit this.
+ */
if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
- EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
+ final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
try {
failEngine("lucene commit failed", engineException);
- } catch (Exception inner) {
+ } catch (final Exception inner) {
engineException.addSuppressed(inner);
}
throw engineException;
@@ -1812,7 +1822,7 @@ public boolean isRecovering() {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map commitDataAsMap(final IndexWriter indexWriter) {
- Map commitData = new HashMap<>(6);
+ Map commitData = new HashMap<>(5);
for (Map.Entry entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index d9a8cc408f822..014500230a404 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -85,14 +85,14 @@
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
* The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
- * generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
- * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
+ * generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
+ * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case
* the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than
* one translog file present. Such an uncommitted translog file always has a translog-${gen}.ckp associated with it which is an fsynced copy of the it's last translog.ckp such that in
* disaster recovery last fsynced offsets, number of operation etc. are still preserved.
*
*/
-public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable, TwoPhaseCommit {
+public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
/*
* TODO
@@ -804,6 +804,8 @@ public static Type fromId(byte id) {
long seqNo();
+ long primaryTerm();
+
/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
@@ -953,6 +955,7 @@ public long seqNo() {
return seqNo;
}
+ @Override
public long primaryTerm() {
return primaryTerm;
}
@@ -1104,6 +1107,7 @@ public long seqNo() {
return seqNo;
}
+ @Override
public long primaryTerm() {
return primaryTerm;
}
@@ -1180,6 +1184,7 @@ public long seqNo() {
return seqNo;
}
+ @Override
public long primaryTerm() {
return primaryTerm;
}
@@ -1347,6 +1352,31 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
out.writeInt((int) checksum);
}
+ /**
+ * Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if
+ * there is no generation that could any such sequence number.
+ *
+ * @param seqNo the sequence number
+ * @return the minimum generation for the sequence number
+ */
+ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
+ try (ReleasableLock ignored = writeLock.acquire()) {
+ /*
+ * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the
+ * local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will
+ * be the current translog generation as we do not need any prior generations to have a complete history up to the current local
+ * checkpoint.
+ */
+ long minTranslogFileGeneration = this.currentFileGeneration();
+ for (final TranslogReader reader : readers) {
+ if (seqNo <= reader.getCheckpoint().maxSeqNo) {
+ minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
+ }
+ }
+ return new TranslogGeneration(translogUUID, minTranslogFileGeneration);
+ }
+ }
+
/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
@@ -1375,27 +1405,38 @@ public void rollGeneration() throws IOException {
}
}
- @Override
- public long prepareCommit() throws IOException {
+ /**
+ * Prepares a translog commit by setting the current committing generation and rolling the translog generation.
+ *
+ * @throws IOException if an I/O exception occurred while rolling the translog generation
+ */
+ public void prepareCommit() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
- final String message = String.format(
- Locale.ROOT,
- "already committing a translog with generation [%d]",
- currentCommittingGeneration);
+ final String message =
+ String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration);
throw new IllegalStateException(message);
}
currentCommittingGeneration = current.getGeneration();
rollGeneration();
}
- return 0;
}
- @Override
- public long commit() throws IOException {
+ /**
+ * Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation
+ * will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved.
+ *
+ * If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog
+ * generation to be rolled.
+ *
+ * @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations
+ * @throws IOException if an I/O exception occurred preparing the translog commit
+ */
+ public void commit(final long committedGeneration) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
+ assert assertCommittedGenerationIsInValidRange(committedGeneration);
if (currentCommittingGeneration == NOT_SET_GENERATION) {
prepareCommit();
}
@@ -1403,26 +1444,39 @@ public long commit() throws IOException {
assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration)
: "readers missing committing generation [" + currentCommittingGeneration + "]";
// set the last committed generation otherwise old files will not be cleaned up
- lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1;
+ lastCommittedTranslogFileGeneration = committedGeneration;
currentCommittingGeneration = NOT_SET_GENERATION;
trimUnreferencedReaders();
}
- return 0;
}
+ private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) {
+ assert committedGeneration <= current.generation
+ : "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]";
+ final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
+ assert committedGeneration >= min
+ : "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]";
+ return true;
+ }
+
+ /**
+ * Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views
+ * and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}.
+ */
void trimUnreferencedReaders() {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
- // we're shutdown potentially on some tragic event - don't delete anything
+ // we're shutdown potentially on some tragic event, don't delete anything
return;
}
- long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
- minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
- final long finalMinReferencedGen = minReferencedGen;
- List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
+ long minReferencedGen = Math.min(
+ lastCommittedTranslogFileGeneration,
+ outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE));
+ final List unreferenced =
+ readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
- Path translogPath = unreferencedReader.path();
- logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
+ final Path translogPath = unreferencedReader.path();
+ logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);
IOUtils.closeWhileHandlingException(unreferencedReader);
IOUtils.deleteFilesIgnoringExceptions(translogPath,
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration())));
@@ -1442,13 +1496,6 @@ void closeFilesIfNoPendingViews() throws IOException {
}
}
-
- @Override
- public void rollback() throws IOException {
- ensureOpen();
- close();
- }
-
/**
* References a transaction log generation
*/
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index c78374a0e9e1e..6c9ca6adc963b 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -150,6 +150,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -165,6 +166,8 @@
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.shuffle;
@@ -833,6 +836,58 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
}
}
+ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
+ final int docs = randomIntBetween(1, 4096);
+ final List seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toList());
+ Randomness.shuffle(seqNos);
+ engine.close();
+ Engine initialEngine = null;
+ try {
+ final AtomicInteger counter = new AtomicInteger();
+ initialEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG)) {
+ @Override
+ public SequenceNumbersService seqNoService() {
+ return new SequenceNumbersService(
+ engine.shardId,
+ engine.config().getIndexSettings(),
+ SequenceNumbersService.NO_OPS_PERFORMED,
+ SequenceNumbersService.NO_OPS_PERFORMED,
+ SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ @Override
+ public long generateSeqNo() {
+ return seqNos.get(counter.getAndIncrement());
+ }
+ };
+ }
+ };
+ for (int i = 0; i < docs; i++) {
+ final String id = Integer.toString(i);
+ final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null);
+ initialEngine.index(indexForDoc(doc));
+ if (rarely()) {
+ initialEngine.getTranslog().rollGeneration();
+ } else if (rarely()) {
+ initialEngine.flush();
+ }
+ }
+ } finally {
+ IOUtils.close(initialEngine);
+ }
+
+ Engine recoveringEngine = null;
+ try {
+ recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
+ recoveringEngine.recoverFromTranslog();
+ try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
+ TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
+ assertEquals(docs, topDocs.totalHits);
+ }
+ } finally {
+ IOUtils.close(recoveringEngine);
+ }
+
+ }
+
public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
@@ -3357,48 +3412,68 @@ public void testSequenceIDs() throws Exception {
searchResult.close();
}
+ /**
+ * A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier
+ * and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the
+ * value of {@code expectedLocalCheckpoint} is set accordingly.
+ *
+ * @param latchReference to latch the thread for the purpose of stalling
+ * @param barrier to signal the thread has generated a new sequence number
+ * @param stall whether or not the thread should stall
+ * @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence
+ * number
+ * @return a sequence number service
+ */
+ private SequenceNumbersService getStallingSeqNoService(
+ final AtomicReference latchReference,
+ final CyclicBarrier barrier,
+ final AtomicBoolean stall,
+ final AtomicLong expectedLocalCheckpoint) {
+ return new SequenceNumbersService(
+ shardId,
+ defaultSettings,
+ SequenceNumbersService.NO_OPS_PERFORMED,
+ SequenceNumbersService.NO_OPS_PERFORMED,
+ SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ @Override
+ public long generateSeqNo() {
+ final long seqNo = super.generateSeqNo();
+ final CountDownLatch latch = latchReference.get();
+ if (stall.get()) {
+ try {
+ barrier.await();
+ latch.await();
+ } catch (BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ if (expectedLocalCheckpoint.get() + 1 == seqNo) {
+ expectedLocalCheckpoint.set(seqNo);
+ }
+ }
+ return seqNo;
+ }
+ };
+ }
+
public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws BrokenBarrierException, InterruptedException, IOException {
engine.close();
final int docs = randomIntBetween(1, 32);
InternalEngine initialEngine = null;
try {
- final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference latchReference = new AtomicReference<>(new CountDownLatch(1));
final CyclicBarrier barrier = new CyclicBarrier(2);
- final AtomicBoolean skip = new AtomicBoolean();
+ final AtomicBoolean stall = new AtomicBoolean();
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
final List threads = new ArrayList<>();
- final SequenceNumbersService seqNoService =
- new SequenceNumbersService(
- shardId,
- defaultSettings,
- SequenceNumbersService.NO_OPS_PERFORMED,
- SequenceNumbersService.NO_OPS_PERFORMED,
- SequenceNumbersService.UNASSIGNED_SEQ_NO) {
- @Override
- public long generateSeqNo() {
- final long seqNo = super.generateSeqNo();
- if (skip.get()) {
- try {
- barrier.await();
- latch.await();
- } catch (BrokenBarrierException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- } else {
- if (expectedLocalCheckpoint.get() + 1 == seqNo) {
- expectedLocalCheckpoint.set(seqNo);
- }
- }
- return seqNo;
- }
- };
+ final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
final InternalEngine finalInitialEngine = initialEngine;
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null);
- skip.set(randomBoolean());
+ stall.set(randomBoolean());
final Thread thread = new Thread(() -> {
try {
finalInitialEngine.index(indexForDoc(doc));
@@ -3407,7 +3482,7 @@ public long generateSeqNo() {
}
});
thread.start();
- if (skip.get()) {
+ if (stall.get()) {
threads.add(thread);
barrier.await();
} else {
@@ -3419,7 +3494,7 @@ public long generateSeqNo() {
assertThat(initialEngine.seqNoService().getMaxSeqNo(), equalTo((long) (docs - 1)));
initialEngine.flush(true, true);
- latch.countDown();
+ latchReference.get().countDown();
for (final Thread thread : threads) {
thread.join();
}
@@ -3594,6 +3669,78 @@ public long generateSeqNo() {
}
}
+ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierException, InterruptedException {
+ engine.close();
+ final int numberOfTriplets = randomIntBetween(1, 32);
+ InternalEngine actualEngine = null;
+ try {
+ final AtomicReference latchReference = new AtomicReference<>();
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ final AtomicBoolean stall = new AtomicBoolean();
+ final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
+ final Map threads = new LinkedHashMap<>();
+ final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
+ actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
+ final InternalEngine finalActualEngine = actualEngine;
+ final Translog translog = finalActualEngine.getTranslog();
+ final long generation = finalActualEngine.getTranslog().currentFileGeneration();
+ for (int i = 0; i < numberOfTriplets; i++) {
+ /*
+ * Index three documents with the first and last landing in the same generation and the middle document being stalled until
+ * a later generation.
+ */
+ stall.set(false);
+ index(finalActualEngine, 3 * i);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ latchReference.set(latch);
+ final int skipId = 3 * i + 1;
+ stall.set(true);
+ final Thread thread = new Thread(() -> {
+ try {
+ index(finalActualEngine, skipId);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ thread.start();
+ threads.put(thread, latch);
+ barrier.await();
+
+ stall.set(false);
+ index(finalActualEngine, 3 * i + 2);
+ finalActualEngine.flush();
+
+ /*
+ * This sequence number landed in the last generation, but the lower and upper bounds for an earlier generation straddle
+ * this sequence number.
+ */
+ assertThat(translog.getMinGenerationForSeqNo(3 * i + 1).translogFileGeneration, equalTo(i + generation));
+ }
+
+ int i = 0;
+ for (final Map.Entry entry : threads.entrySet()) {
+ final Map userData = finalActualEngine.commitStats().getUserData();
+ assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i)));
+ assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation)));
+ entry.getValue().countDown();
+ entry.getKey().join();
+ finalActualEngine.flush();
+ i++;
+ }
+
+ } finally {
+ IOUtils.close(actualEngine);
+ }
+ }
+
+ private void index(final InternalEngine engine, final int id) throws IOException {
+ final String docId = Integer.toString(id);
+ final ParsedDocument doc =
+ testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
+ engine.index(indexForDoc(doc));
+ }
+
/**
* Return a tuple representing the sequence ID for the given {@code Get}
* operation. The first value in the tuple is the sequence number, the
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 6b2aa5e59215e..fa0e1259f0fbc 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -36,8 +36,10 @@
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -84,6 +86,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -101,6 +104,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.hamcrest.Matchers.containsString;
@@ -124,7 +128,7 @@ protected void afterIfSuccessful() throws Exception {
if (translog.isOpen()) {
if (translog.currentFileGeneration() > 1) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
assertFileDeleted(translog, translog.currentFileGeneration() - 1);
}
translog.close();
@@ -287,7 +291,7 @@ public void testSimpleOperations() throws IOException {
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
assertThat(snapshot.totalOperations(), equalTo(0));
@@ -373,7 +377,7 @@ public void testStats() throws IOException {
}
}
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(0L));
@@ -446,7 +450,7 @@ public void testSnapshotWithNewTranslog() throws IOException {
try (Translog.View view = translog.newView()) {
Translog.Snapshot snapshot2 = translog.newSnapshot();
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
assertThat(snapshot2, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
}
@@ -821,7 +825,7 @@ protected void doRun() throws Exception {
break;
}
}
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
}
} finally {
run.set(false);
@@ -858,7 +862,7 @@ public void testSyncUpTo() throws IOException {
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
}
if (rarely()) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
}
@@ -878,7 +882,7 @@ public void testSyncUpToStream() throws IOException {
ArrayList locations = new ArrayList<>();
for (int op = 0; op < translogOperations; op++) {
if (rarely()) {
- translog.commit(); // do this first so that there is at least one pending tlog entry
+ translog.commit(translog.currentFileGeneration()); // do this first so that there is at least one pending tlog entry
}
final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
locations.add(location);
@@ -889,7 +893,7 @@ public void testSyncUpToStream() throws IOException {
assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream()));
assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
} else if (rarely()) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
} else {
@@ -909,7 +913,7 @@ public void testLocationComparison() throws IOException {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
if (rarely() && translogOperations > op + 1) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
}
}
Collections.shuffle(locations, random());
@@ -1074,7 +1078,7 @@ public void testBasicRecovery() throws IOException {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations - 1) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
minUncommittedOp = op + 1;
translogGeneration = translog.getGeneration();
}
@@ -1300,7 +1304,7 @@ public void testOpenForeignTranslog() throws IOException {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
firstUncommitted = op + 1;
}
}
@@ -1483,7 +1487,7 @@ public void testFailFlush() throws IOException {
}
try {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
fail("already closed");
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
@@ -1930,7 +1934,7 @@ public void testWithRandomException() throws IOException {
if (randomBoolean()) {
failableTLog.prepareCommit();
}
- failableTLog.commit();
+ failableTLog.commit(translog.currentFileGeneration());
syncedDocs.clear();
}
}
@@ -2110,12 +2114,13 @@ public void testRollGeneration() throws IOException {
for (int i = 0; i <= rolls; i++) {
assertFileIsPresent(translog, generation + i);
}
- translog.commit();
+ translog.commit(generation + rolls);
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1));
assertThat(translog.totalOperations(), equalTo(0));
- for (int i = 0; i <= rolls; i++) {
+ for (int i = 0; i < rolls; i++) {
assertFileDeleted(translog, generation + i);
}
+ assertFileIsPresent(translog, generation + rolls);
assertFileIsPresent(translog, generation + rolls + 1);
}
@@ -2167,7 +2172,7 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
}
}
- translog.commit();
+ translog.commit(generation + rollsBefore + 1);
for (int i = 0; i <= rollsBefore; i++) {
assertFileDeleted(translog, generation + i);
@@ -2178,4 +2183,130 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
}
+ public void testMinGenerationForSeqNo() throws IOException {
+ final long initialGeneration = translog.getGeneration().translogFileGeneration;
+ final int operations = randomIntBetween(1, 4096);
+ final List shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
+ Randomness.shuffle(shuffledSeqNos);
+ final List> seqNos = new ArrayList<>();
+ final Map terms = new HashMap<>();
+ for (final Long seqNo : shuffledSeqNos) {
+ seqNos.add(Tuple.tuple(seqNo, terms.computeIfAbsent(seqNo, k -> 0L)));
+ Long repeatingTermSeqNo = randomFrom(seqNos.stream().map(Tuple::v1).collect(Collectors.toList()));
+ seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.computeIfPresent(repeatingTermSeqNo, (s, t) -> t + 1)));
+ }
+
+ for (final Tuple tuple : seqNos) {
+ translog.add(new Translog.NoOp(tuple.v1(), tuple.v2(), "test"));
+ if (rarely()) {
+ translog.rollGeneration();
+ }
+ }
+
+ Map>> generations = new HashMap<>();
+
+ translog.commit(initialGeneration);
+ for (long seqNo = 0; seqNo < operations; seqNo++) {
+ final Set> seenSeqNos = new HashSet<>();
+ final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration;
+ for (long g = generation; g < translog.currentFileGeneration(); g++) {
+ if (!generations.containsKey(g)) {
+ final Set> generationSeenSeqNos = new HashSet<>();
+ final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g)));
+ try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) {
+ Translog.Snapshot snapshot = reader.newSnapshot();
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
+ generationSeenSeqNos.add(Tuple.tuple(operation.seqNo(), operation.primaryTerm()));
+ }
+ }
+ generations.put(g, generationSeenSeqNos);
+
+ }
+ seenSeqNos.addAll(generations.get(g));
+ }
+
+ final long seqNoLowerBound = seqNo;
+ final Set> expected = seqNos.stream().filter(t -> t.v1() >= seqNoLowerBound).collect(Collectors.toSet());
+ seenSeqNos.retainAll(expected);
+ assertThat(seenSeqNos, equalTo(expected));
+ }
+ }
+
+ public void testSimpleCommit() throws IOException {
+ final int operations = randomIntBetween(1, 4096);
+ long seqNo = 0;
+ for (int i = 0; i < operations; i++) {
+ translog.add(new Translog.NoOp(seqNo++, 0, "test'"));
+ if (rarely()) {
+ translog.rollGeneration();
+ }
+ }
+
+ final long generation =
+ randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration()));
+ translog.commit(generation);
+ for (long i = 0; i < generation; i++) {
+ assertFileDeleted(translog, i);
+ }
+ for (long i = generation; i <= translog.currentFileGeneration(); i++) {
+ assertFileIsPresent(translog, i);
+ }
+ }
+
+ public void testPrepareCommitAndCommit() throws IOException {
+ final int operations = randomIntBetween(1, 4096);
+ long seqNo = 0;
+ long last = -1;
+ for (int i = 0; i < operations; i++) {
+ translog.add(new Translog.NoOp(seqNo++, 0, "test"));
+ if (rarely()) {
+ final long generation = translog.currentFileGeneration();
+ translog.prepareCommit();
+ if (rarely()) {
+ // simulate generation filling up and rolling between preparing the commit and committing
+ translog.rollGeneration();
+ }
+ final int committedGeneration = randomIntBetween(Math.max(1, Math.toIntExact(last)), Math.toIntExact(generation));
+ translog.commit(committedGeneration);
+ last = committedGeneration;
+ for (long g = 0; i < generation; g++) {
+ assertFileDeleted(translog, g);
+ }
+ for (long g = generation; g < translog.currentFileGeneration(); g++) {
+ assertFileIsPresent(translog, g);
+ }
+ }
+ }
+ }
+
+ public void testCommitWithOpenView() throws IOException {
+ final int operations = randomIntBetween(1, 4096);
+ long seqNo = 0;
+ long lastCommittedGeneration = -1;
+ for (int i = 0; i < operations; i++) {
+ translog.add(new Translog.NoOp(seqNo++, 0, "test"));
+ if (rarely()) {
+ try (Translog.View ignored = translog.newView()) {
+ final long viewGeneration = lastCommittedGeneration;
+ translog.prepareCommit();
+ final long committedGeneration = randomIntBetween(
+ Math.max(1, Math.toIntExact(lastCommittedGeneration)),
+ Math.toIntExact(translog.currentFileGeneration()));
+ translog.commit(committedGeneration);
+ lastCommittedGeneration = committedGeneration;
+ // with an open view, committing should preserve generations back to the last committed generation
+ for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) {
+ assertFileDeleted(translog, g);
+ }
+ // the view generation could be -1 if no commit has been performed
+ final long max = Math.max(1, Math.min(lastCommittedGeneration, viewGeneration));
+ for (long g = max; g < translog.currentFileGeneration(); g++) {
+ assertFileIsPresent(translog, g);
+ }
+ }
+ }
+ }
+ }
+
}