From ba2ec6a6c9ed1f3832f53f5e56cb7d4a5099a6f2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 23 Jan 2018 22:55:18 -0500 Subject: [PATCH 01/11] ShouldFlush should include uncommitted docs condition --- .../elasticsearch/index/engine/Engine.java | 6 ++++ .../index/engine/InternalEngine.java | 5 +++ .../elasticsearch/index/shard/IndexShard.java | 7 ++--- .../indices/recovery/RecoveryTests.java | 31 +++++++++++++++++++ 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7feaeb63ac36f..dc58c39d6e5e7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -817,6 +817,12 @@ public final boolean refreshNeeded() { // NOTE: do NOT rename this to something containing flush or refresh! public abstract void writeIndexingBuffer() throws EngineException; + /** + * Checks if this engine should be flushed. + * This check is mainly based the uncommitted translog size and the translog threshold flush size setting. + */ + public abstract boolean shouldFlush(); + /** * Flushes the state of the engine including the transaction log, clearing memory. * diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 97a6403ec3b23..0440256d6dc72 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1462,6 +1462,11 @@ final boolean tryRenewSyncCommit() { return renewed; } + @Override + public boolean shouldFlush() { + return translog.shouldFlush() && indexWriter.hasUncommittedChanges(); + } + @Override public CommitId flush() throws EngineException { return flush(false, false); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3ace9ededc5b3..398953f2c80e9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1597,17 +1597,16 @@ public boolean restoreFromRepository(Repository repository) { } /** - * Tests whether or not the translog should be flushed. This test is based on the current size of the translog comparted to the + * Tests whether or not the engine should be flushed. This test is based on the current size of the translog compared to the * configured flush threshold size. * - * @return {@code true} if the translog should be flushed + * @return {@code true} if the engine should be flushed */ boolean shouldFlush() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - final Translog translog = engine.getTranslog(); - return translog.shouldFlush(); + return engine.shouldFlush(); } catch (final AlreadyClosedException e) { // we are already closed, no need to flush or roll } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 2089c36d06bc0..478730bc6bbf7 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; @@ -44,6 +45,7 @@ import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.threadpool.ThreadPoolStats; import java.util.HashMap; import java.util.List; @@ -306,4 +308,33 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { } } + public void testShouldFlushAfterFileBasedRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startAll(); + long translogSizeOnPrimary = 0; + int numDocs = shards.indexDocs(between(10, 100)); + translogSizeOnPrimary += shards.getPrimary().getTranslog().uncommittedSizeInBytes(); + shards.flush(); + + final IndexShard replica = shards.addReplica(); + IndexMetaData.Builder builder = IndexMetaData.builder(replica.indexSettings().getIndexMetaData()); + long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, translogSizeOnPrimary); + builder.settings(Settings.builder().put(replica.indexSettings().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b") + ); + replica.indexSettings().updateIndexMetaData(builder.build()); + replica.onSettingsChanged(); + shards.recoverReplica(replica); + // Make sure there is no infinite loop of flushing. + assertBusy(() -> { + int tasks = 0; + for (ThreadPoolStats.Stats stats : replica.getThreadPool().stats()) { + tasks += stats.getThreads(); + } + assertThat(tasks, equalTo(0)); + }); + assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs)); + shards.assertAllEqual(numDocs); + } + } } From ad530f3c7ec080192af46a57a908f71b3d183e59 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 24 Jan 2018 13:12:31 -0500 Subject: [PATCH 02/11] consider shouldFlush condition in flush --- .../elasticsearch/index/engine/InternalEngine.java | 4 ++-- .../elasticsearch/indices/recovery/RecoveryTests.java | 11 ++--------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0440256d6dc72..89d5dc6a79fed 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1464,7 +1464,7 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldFlush() { - return translog.shouldFlush() && indexWriter.hasUncommittedChanges(); + return translog.shouldFlush(); } @Override @@ -1497,7 +1497,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti logger.trace("acquired flush lock immediately"); } try { - if (indexWriter.hasUncommittedChanges() || force) { + if (indexWriter.hasUncommittedChanges() || force || shouldFlush()) { ensureCanFlush(); try { translog.rollGeneration(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 478730bc6bbf7..5dbd9e2c04fe8 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -308,7 +308,7 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { } } - public void testShouldFlushAfterFileBasedRecovery() throws Exception { + public void testShouldFlushAfterPeerRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); long translogSizeOnPrimary = 0; @@ -325,14 +325,7 @@ public void testShouldFlushAfterFileBasedRecovery() throws Exception { replica.indexSettings().updateIndexMetaData(builder.build()); replica.onSettingsChanged(); shards.recoverReplica(replica); - // Make sure there is no infinite loop of flushing. - assertBusy(() -> { - int tasks = 0; - for (ThreadPoolStats.Stats stats : replica.getThreadPool().stats()) { - tasks += stats.getThreads(); - } - assertThat(tasks, equalTo(0)); - }); + assertBusy(() -> assertThat(getEngine(replica).shouldFlush(), equalTo(false))); assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs)); shards.assertAllEqual(numDocs); } From fe6901abb20097c1f7266a0ad27700b9421f0c73 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 24 Jan 2018 13:41:05 -0500 Subject: [PATCH 03/11] Add local checkpoint condition to shouldFlush --- .../org/elasticsearch/index/engine/InternalEngine.java | 9 ++++++++- .../elasticsearch/indices/recovery/RecoveryTests.java | 1 - 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 89d5dc6a79fed..6e431d6943b6d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1464,7 +1464,14 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldFlush() { - return translog.shouldFlush(); + if (translog.shouldFlush() == false) { + return false; + } + // We should only flush iff the shouldFlush condition can become false after flushing. + final long localCheckpoint = localCheckpointTracker.getCheckpoint(); + final long translogGenFromLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + final long translogGenForNewCommit = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; + return translogGenForNewCommit > translogGenFromLastCommit; } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 5dbd9e2c04fe8..e14c2d69a0265 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -45,7 +45,6 @@ import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.threadpool.ThreadPoolStats; import java.util.HashMap; import java.util.List; From 9789bd7c87cad6e88c0ae3466baf6eb6edfe0c51 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 24 Jan 2018 14:28:29 -0500 Subject: [PATCH 04/11] add Engine#shouldFlush test --- .../index/engine/InternalEngineTests.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2a7e49aa66b61..f97233d5e1c61 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,6 +47,7 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; @@ -163,6 +165,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -4439,4 +4442,46 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1))); } } + + public void testShouldFlush() throws Exception { + assertThat("Empty engine does not need flushing", engine.shouldFlush(), equalTo(false)); + int numDocs = between(10, 100); + for (int id = 0; id < numDocs; id++) { + final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + } + assertThat("Not exceeded translog flush threshold yet", engine.shouldFlush(), equalTo(false)); + long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes()); + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder() + .put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")) + .build(); + indexSettings.updateIndexMetaData(indexMetaData); + engine.onSettingsChanged(); + int moreDocs = between(0, 10); + for (int id = numDocs; id < numDocs + moreDocs; id++) { + final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + } + assertThat(engine.getTranslog().shouldFlush(), equalTo(true)); + assertThat(engine.shouldFlush(), equalTo(false)); + engine.rollTranslogGeneration(); + assertThat(engine.getTranslog().shouldFlush(), equalTo(true)); + assertThat(engine.shouldFlush(), equalTo(true)); + engine.flush(); + // We can flush with stale documents + for (int id = 0; id < numDocs; id++) { + final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); + final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false)); + assertThat(result.isCreated(), equalTo(false)); + } + engine.getTranslog().rollGeneration(); // This is called automatically by IndexShard#afterWriteOperation + assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(numDocs + moreDocs - 1L)); + final SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); + assertThat(engine.shouldFlush(), equalTo(true)); + engine.flush(false, true); + assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + } } From 18d8fe606e4487965b5745fb1a5ea6c9187c9580 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 24 Jan 2018 14:34:26 -0500 Subject: [PATCH 05/11] grammar --- server/src/main/java/org/elasticsearch/index/engine/Engine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index dc58c39d6e5e7..d8b09924ddb90 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -819,7 +819,7 @@ public final boolean refreshNeeded() { /** * Checks if this engine should be flushed. - * This check is mainly based the uncommitted translog size and the translog threshold flush size setting. + * This check is mainly based on the uncommitted translog size and the translog flush threshold setting. */ public abstract boolean shouldFlush(); From 15e4edfc839f75016db6a16fa9e85ebfbab45bac Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 24 Jan 2018 22:10:13 -0500 Subject: [PATCH 06/11] should flush if local checkpoint = max_seqno --- .../index/engine/InternalEngine.java | 9 ++++++- .../index/engine/InternalEngineTests.java | 25 ++++++++++++------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6e431d6943b6d..83af562734669 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1467,8 +1467,15 @@ public boolean shouldFlush() { if (translog.shouldFlush() == false) { return false; } - // We should only flush iff the shouldFlush condition can become false after flushing. + /* + * We should only flush ony if the shouldFlush condition can become false after flushing. This condition will change if: + * 1. The min translog gen of the next commit points to a different translog gen than the last commit + * 2. If Local checkpoint equals to max_seqno, the min translog gen of the next commit will point to the newly rolled generation + */ final long localCheckpoint = localCheckpointTracker.getCheckpoint(); + if (localCheckpoint == localCheckpointTracker.getMaxSeqNo()) { + return true; + } final long translogGenFromLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); final long translogGenForNewCommit = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; return translogGenForNewCommit > translogGenFromLastCommit; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f97233d5e1c61..0ba0712693a7f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4465,23 +4465,30 @@ public void testShouldFlush() throws Exception { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); } - assertThat(engine.getTranslog().shouldFlush(), equalTo(true)); - assertThat(engine.shouldFlush(), equalTo(false)); - engine.rollTranslogGeneration(); - assertThat(engine.getTranslog().shouldFlush(), equalTo(true)); assertThat(engine.shouldFlush(), equalTo(true)); engine.flush(); - // We can flush with stale documents + // No gap - can flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false)); assertThat(result.isCreated(), equalTo(false)); } - engine.getTranslog().rollGeneration(); // This is called automatically by IndexShard#afterWriteOperation - assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(numDocs + moreDocs - 1L)); - final SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); + SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); assertThat(engine.shouldFlush(), equalTo(true)); - engine.flush(false, true); + engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + // With gap - can not flush + final long maxSeqNo = engine.getLocalCheckpointTracker().generateSeqNo(); + for (int id = 0; id < numDocs; id++) { + final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); + final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false)); + assertThat(result.isCreated(), equalTo(false)); + } + assertThat(engine.shouldFlush(), equalTo(false)); + // Fill gap - can flush again + final ParsedDocument doc = testParsedDocument(Long.toString(maxSeqNo), null, testDocumentWithTextField(), SOURCE, null); + final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, maxSeqNo, false)); + assertThat(result.isCreated(), equalTo(true)); + assertThat(engine.shouldFlush(), equalTo(true)); } } From f53045e5b816e574903dce58227f574c177b0a6d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 25 Jan 2018 10:26:20 -0500 Subject: [PATCH 07/11] Compare current uncommittedSize to the next uncommittedSize --- .../elasticsearch/index/engine/Engine.java | 4 +- .../index/engine/InternalEngine.java | 27 +++++++------ .../elasticsearch/index/shard/IndexShard.java | 6 +-- .../index/translog/Translog.java | 13 +----- .../index/engine/InternalEngineTests.java | 40 ++++++------------- .../indices/recovery/RecoveryTests.java | 2 +- 6 files changed, 33 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index d8b09924ddb90..a947bc992f390 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -818,10 +818,10 @@ public final boolean refreshNeeded() { public abstract void writeIndexingBuffer() throws EngineException; /** - * Checks if this engine should be flushed. + * Checks if this engine should be flushed to free translog. * This check is mainly based on the uncommitted translog size and the translog flush threshold setting. */ - public abstract boolean shouldFlush(); + public abstract boolean shouldFlushToFreeTranslog(); /** * Flushes the state of the engine including the transaction log, clearing memory. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 83af562734669..89abd781383ae 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1463,22 +1463,21 @@ final boolean tryRenewSyncCommit() { } @Override - public boolean shouldFlush() { - if (translog.shouldFlush() == false) { + public boolean shouldFlushToFreeTranslog() { + ensureOpen(); + final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); + final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); + if (uncommittedSizeOfCurrentCommit < flushThreshold) { return false; } /* - * We should only flush ony if the shouldFlush condition can become false after flushing. This condition will change if: - * 1. The min translog gen of the next commit points to a different translog gen than the last commit - * 2. If Local checkpoint equals to max_seqno, the min translog gen of the next commit will point to the newly rolled generation + * We should only flush ony if the shouldFlush condition can become false after flushing. + * This condition will change if the `uncommittedSize` of the new commit is smaller than + * the `uncommittedSize` of the current commit. This method is to maintain translog only, + * thus the IndexWriter#hasUncommittedChanges condition is not considered. */ - final long localCheckpoint = localCheckpointTracker.getCheckpoint(); - if (localCheckpoint == localCheckpointTracker.getMaxSeqNo()) { - return true; - } - final long translogGenFromLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); - final long translogGenForNewCommit = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; - return translogGenForNewCommit > translogGenFromLastCommit; + final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1); + return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit; } @Override @@ -1511,7 +1510,9 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti logger.trace("acquired flush lock immediately"); } try { - if (indexWriter.hasUncommittedChanges() || force || shouldFlush()) { + // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the + // newly created commit points to a different translog generation (can free translog) + if (indexWriter.hasUncommittedChanges() || force || shouldFlushToFreeTranslog()) { ensureCanFlush(); try { translog.rollGeneration(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 398953f2c80e9..004a1bf3465e1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1597,8 +1597,8 @@ public boolean restoreFromRepository(Repository repository) { } /** - * Tests whether or not the engine should be flushed. This test is based on the current size of the translog compared to the - * configured flush threshold size. + * Tests whether or not the engine should be flushed to free translog. + * This test is based on the current size of the translog compared to the configured flush threshold size. * * @return {@code true} if the engine should be flushed */ @@ -1606,7 +1606,7 @@ boolean shouldFlush() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - return engine.shouldFlush(); + return engine.shouldFlushToFreeTranslog(); } catch (final AlreadyClosedException e) { // we are already closed, no need to flush or roll } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index b4bf6173f74cf..3cbc8fc530539 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -436,7 +436,7 @@ private long sizeInBytesByMinGen(long minGeneration) { /** * Returns the size in bytes of the translog files with ops above the given seqNo */ - private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { + public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum(); @@ -523,17 +523,6 @@ public Location add(final Operation operation) throws IOException { } } - /** - * Tests whether or not the translog should be flushed. This test is based on the current size - * of the translog comparted to the configured flush threshold size. - * - * @return {@code true} if the translog should be flushed - */ - public boolean shouldFlush() { - final long size = this.uncommittedSizeInBytes(); - return size > this.indexSettings.getFlushThresholdSize().getBytes(); - } - /** * Tests whether or not the translog generation should be rolled to a new generation. This test * is based on the size of the current generation compared to the configured generation diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 0ba0712693a7f..d8538bee7cafd 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4443,52 +4443,36 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { } } - public void testShouldFlush() throws Exception { - assertThat("Empty engine does not need flushing", engine.shouldFlush(), equalTo(false)); + public void testShouldFlushToFreeTranslog() throws Exception { + assertThat("Empty engine does not need flushing", engine.shouldFlushToFreeTranslog(), equalTo(false)); int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); } - assertThat("Not exceeded translog flush threshold yet", engine.shouldFlush(), equalTo(false)); + assertThat("Not exceeded translog flush threshold yet", engine.shouldFlushToFreeTranslog(), equalTo(false)); long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes()); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) - .settings(Settings.builder() - .put(indexSettings.getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")) - .build(); + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - int moreDocs = between(0, 10); - for (int id = numDocs; id < numDocs + moreDocs; id++) { - final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); - engine.index(indexForDoc(doc)); - } - assertThat(engine.shouldFlush(), equalTo(true)); + assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.shouldFlushToFreeTranslog(), equalTo(true)); engine.flush(); - // No gap - can flush + assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false)); assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(engine.shouldFlush(), equalTo(true)); + assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.shouldFlushToFreeTranslog(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - // With gap - can not flush - final long maxSeqNo = engine.getLocalCheckpointTracker().generateSeqNo(); - for (int id = 0; id < numDocs; id++) { - final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); - final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false)); - assertThat(result.isCreated(), equalTo(false)); - } - assertThat(engine.shouldFlush(), equalTo(false)); - // Fill gap - can flush again - final ParsedDocument doc = testParsedDocument(Long.toString(maxSeqNo), null, testDocumentWithTextField(), SOURCE, null); - final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, maxSeqNo, false)); - assertThat(result.isCreated(), equalTo(true)); - assertThat(engine.shouldFlush(), equalTo(true)); + assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index e14c2d69a0265..29444e9c9a6cb 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -324,7 +324,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { replica.indexSettings().updateIndexMetaData(builder.build()); replica.onSettingsChanged(); shards.recoverReplica(replica); - assertBusy(() -> assertThat(getEngine(replica).shouldFlush(), equalTo(false))); + assertBusy(() -> assertThat(getEngine(replica).shouldFlushToFreeTranslog(), equalTo(false))); assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs)); shards.assertAllEqual(numDocs); } From ef7f7137242106d383fc70cf1a78b15bd472301a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 25 Jan 2018 11:16:44 -0500 Subject: [PATCH 08/11] do not flush iff there is no op --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 89abd781383ae..0e225b9cc2044 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1467,7 +1467,8 @@ public boolean shouldFlushToFreeTranslog() { ensureOpen(); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); - if (uncommittedSizeOfCurrentCommit < flushThreshold) { + // If flushThreshold is too small, we may continuously flush even there is no uncommitted operations. + if (uncommittedSizeOfCurrentCommit < flushThreshold || translog.uncommittedOperations() == 0) { return false; } /* From 36cc8bceaac9ed970ff86f07a386b972883a1775 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 25 Jan 2018 12:39:15 -0500 Subject: [PATCH 09/11] -> shouldPeriodicallyFlush --- .../org/elasticsearch/index/engine/Engine.java | 4 ++-- .../index/engine/InternalEngine.java | 4 ++-- .../elasticsearch/index/shard/IndexShard.java | 10 +++++----- .../index/engine/InternalEngineTests.java | 10 +++++----- .../elasticsearch/index/shard/IndexShardIT.java | 16 +++++++--------- .../indices/recovery/RecoveryTests.java | 2 +- 6 files changed, 22 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index a947bc992f390..eea63dec94bf2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -818,10 +818,10 @@ public final boolean refreshNeeded() { public abstract void writeIndexingBuffer() throws EngineException; /** - * Checks if this engine should be flushed to free translog. + * Checks if this engine should be flushed periodically. * This check is mainly based on the uncommitted translog size and the translog flush threshold setting. */ - public abstract boolean shouldFlushToFreeTranslog(); + public abstract boolean shouldPeriodicallyFlush(); /** * Flushes the state of the engine including the transaction log, clearing memory. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0e225b9cc2044..53f3aa5c58f40 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1463,7 +1463,7 @@ final boolean tryRenewSyncCommit() { } @Override - public boolean shouldFlushToFreeTranslog() { + public boolean shouldPeriodicallyFlush() { ensureOpen(); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); @@ -1513,7 +1513,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti try { // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the // newly created commit points to a different translog generation (can free translog) - if (indexWriter.hasUncommittedChanges() || force || shouldFlushToFreeTranslog()) { + if (indexWriter.hasUncommittedChanges() || force || shouldPeriodicallyFlush()) { ensureCanFlush(); try { translog.rollGeneration(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 004a1bf3465e1..92ca1b9748c4c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1597,16 +1597,16 @@ public boolean restoreFromRepository(Repository repository) { } /** - * Tests whether or not the engine should be flushed to free translog. + * Tests whether or not the engine should be flushed periodically. * This test is based on the current size of the translog compared to the configured flush threshold size. * * @return {@code true} if the engine should be flushed */ - boolean shouldFlush() { + boolean shouldPeriodicallyFlush() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - return engine.shouldFlushToFreeTranslog(); + return engine.shouldPeriodicallyFlush(); } catch (final AlreadyClosedException e) { // we are already closed, no need to flush or roll } @@ -2360,7 +2360,7 @@ public Translog.Durability getTranslogDurability() { * executed asynchronously on the flush thread pool. */ public void afterWriteOperation() { - if (shouldFlush() || shouldRollTranslogGeneration()) { + if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) { if (flushOrRollRunning.compareAndSet(false, true)) { /* * We have to check again since otherwise there is a race when a thread passes the first check next to another thread which @@ -2370,7 +2370,7 @@ public void afterWriteOperation() { * Additionally, a flush implicitly executes a translog generation roll so if we execute a flush then we do not need to * check if we should roll the translog generation. */ - if (shouldFlush()) { + if (shouldPeriodicallyFlush()) { logger.debug("submitting async flush request"); final AbstractRunnable flush = new AbstractRunnable() { @Override diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d8538bee7cafd..d375790a1cc74 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4443,14 +4443,14 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { } } - public void testShouldFlushToFreeTranslog() throws Exception { - assertThat("Empty engine does not need flushing", engine.shouldFlushToFreeTranslog(), equalTo(false)); + public void testShouldPeriodicallyFlush() throws Exception { + assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); } - assertThat("Not exceeded translog flush threshold yet", engine.shouldFlushToFreeTranslog(), equalTo(false)); + assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes()); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) @@ -4459,7 +4459,7 @@ public void testShouldFlushToFreeTranslog() throws Exception { indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); - assertThat(engine.shouldFlushToFreeTranslog(), equalTo(true)); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush @@ -4470,7 +4470,7 @@ public void testShouldFlushToFreeTranslog() throws Exception { } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); - assertThat(engine.shouldFlushToFreeTranslog(), equalTo(true)); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index e02b6c04a89d3..601eb8e9b1d66 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -73,7 +72,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; -import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; import java.io.UncheckedIOException; @@ -332,23 +330,23 @@ public void testMaybeFlush() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); IndexShard shard = test.getShardOrNull(0); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(117 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); client().prepareIndex("test", "test", "0") .setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); - assertTrue(shard.shouldFlush()); + assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); assertEquals(2, translog.uncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); }); assertEquals(0, translog.uncommittedOperations()); translog.sync(); @@ -364,7 +362,7 @@ public void testMaybeFlush() throws Exception { assertBusy(() -> { // this is async logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), translog.uncommittedOperations(), translog.getGeneration()); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); }); assertEquals(0, translog.uncommittedOperations()); } @@ -408,7 +406,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); final String key; final boolean flush = randomBoolean(); if (flush) { @@ -423,7 +421,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { .setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE) .get(); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); final AtomicBoolean running = new AtomicBoolean(true); final int numThreads = randomIntBetween(2, 4); final Thread[] threads = new Thread[numThreads]; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 29444e9c9a6cb..42fa026cc4d18 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -324,7 +324,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { replica.indexSettings().updateIndexMetaData(builder.build()); replica.onSettingsChanged(); shards.recoverReplica(replica); - assertBusy(() -> assertThat(getEngine(replica).shouldFlushToFreeTranslog(), equalTo(false))); + assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs)); shards.assertAllEqual(numDocs); } From 9bc74b34e4b1800d0c440699fc3d8aa2182c8a56 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 25 Jan 2018 13:12:24 -0500 Subject: [PATCH 10/11] Comment for uncommited operation checking --- .../elasticsearch/index/engine/InternalEngine.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 53f3aa5c58f40..c98b7763d1dbd 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1467,8 +1467,7 @@ public boolean shouldPeriodicallyFlush() { ensureOpen(); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); - // If flushThreshold is too small, we may continuously flush even there is no uncommitted operations. - if (uncommittedSizeOfCurrentCommit < flushThreshold || translog.uncommittedOperations() == 0) { + if (uncommittedSizeOfCurrentCommit < flushThreshold) { return false; } /* @@ -1478,7 +1477,14 @@ public boolean shouldPeriodicallyFlush() { * thus the IndexWriter#hasUncommittedChanges condition is not considered. */ final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1); - return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit; + /* + * If flushThreshold is too small, we may repeatedly flush even there is no uncommitted operation + * as #sizeOfGensAboveSeqNoInByte and #uncommittedSizeInBytes can return different values. + * An empty translog file has non-zero `uncommittedSize` (the translog header), and method #sizeOfGensAboveSeqNoInBytes can + * return 0 now(no translog gen contains ops above local checkpoint) but method #uncommittedSizeInBytes will return an actual + * non-zero value after rolling a new translog generation. This can be avoided by checking the actual uncommitted operations. + */ + return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit && translog.uncommittedOperations() > 0; } @Override From bcf3704d8d931189f36624ba90f5ade2c3ab6e0b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 25 Jan 2018 13:19:40 -0500 Subject: [PATCH 11/11] add comment for recovery test --- .../org/elasticsearch/indices/recovery/RecoveryTests.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 42fa026cc4d18..69176b03942f6 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -307,12 +307,15 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { } } + /** + * This test makes sure that there is no infinite loop of flushing (the condition `shouldPeriodicallyFlush` eventually is false) + * in peer-recovery if a primary sends a fully-baked index commit. + */ public void testShouldFlushAfterPeerRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); - long translogSizeOnPrimary = 0; int numDocs = shards.indexDocs(between(10, 100)); - translogSizeOnPrimary += shards.getPrimary().getTranslog().uncommittedSizeInBytes(); + final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes(); shards.flush(); final IndexShard replica = shards.addReplica(); @@ -324,6 +327,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { replica.indexSettings().updateIndexMetaData(builder.build()); replica.onSettingsChanged(); shards.recoverReplica(replica); + // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs)); shards.assertAllEqual(numDocs);