diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7feaeb63ac36f..eea63dec94bf2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -817,6 +817,12 @@ public final boolean refreshNeeded() { // NOTE: do NOT rename this to something containing flush or refresh! public abstract void writeIndexingBuffer() throws EngineException; + /** + * Checks if this engine should be flushed periodically. + * This check is mainly based on the uncommitted translog size and the translog flush threshold setting. + */ + public abstract boolean shouldPeriodicallyFlush(); + /** * Flushes the state of the engine including the transaction log, clearing memory. * diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 97a6403ec3b23..c98b7763d1dbd 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1462,6 +1462,31 @@ final boolean tryRenewSyncCommit() { return renewed; } + @Override + public boolean shouldPeriodicallyFlush() { + ensureOpen(); + final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); + final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); + if (uncommittedSizeOfCurrentCommit < flushThreshold) { + return false; + } + /* + * We should only flush ony if the shouldFlush condition can become false after flushing. + * This condition will change if the `uncommittedSize` of the new commit is smaller than + * the `uncommittedSize` of the current commit. This method is to maintain translog only, + * thus the IndexWriter#hasUncommittedChanges condition is not considered. + */ + final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1); + /* + * If flushThreshold is too small, we may repeatedly flush even there is no uncommitted operation + * as #sizeOfGensAboveSeqNoInByte and #uncommittedSizeInBytes can return different values. + * An empty translog file has non-zero `uncommittedSize` (the translog header), and method #sizeOfGensAboveSeqNoInBytes can + * return 0 now(no translog gen contains ops above local checkpoint) but method #uncommittedSizeInBytes will return an actual + * non-zero value after rolling a new translog generation. This can be avoided by checking the actual uncommitted operations. + */ + return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit && translog.uncommittedOperations() > 0; + } + @Override public CommitId flush() throws EngineException { return flush(false, false); @@ -1492,7 +1517,9 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti logger.trace("acquired flush lock immediately"); } try { - if (indexWriter.hasUncommittedChanges() || force) { + // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the + // newly created commit points to a different translog generation (can free translog) + if (indexWriter.hasUncommittedChanges() || force || shouldPeriodicallyFlush()) { ensureCanFlush(); try { translog.rollGeneration(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 25e7a79c02b1e..711fe68bf6593 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1600,17 +1600,16 @@ public boolean restoreFromRepository(Repository repository) { } /** - * Tests whether or not the translog should be flushed. This test is based on the current size of the translog comparted to the - * configured flush threshold size. + * Tests whether or not the engine should be flushed periodically. + * This test is based on the current size of the translog compared to the configured flush threshold size. * - * @return {@code true} if the translog should be flushed + * @return {@code true} if the engine should be flushed */ - boolean shouldFlush() { + boolean shouldPeriodicallyFlush() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - final Translog translog = engine.getTranslog(); - return translog.shouldFlush(); + return engine.shouldPeriodicallyFlush(); } catch (final AlreadyClosedException e) { // we are already closed, no need to flush or roll } @@ -2364,7 +2363,7 @@ public Translog.Durability getTranslogDurability() { * executed asynchronously on the flush thread pool. */ public void afterWriteOperation() { - if (shouldFlush() || shouldRollTranslogGeneration()) { + if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) { if (flushOrRollRunning.compareAndSet(false, true)) { /* * We have to check again since otherwise there is a race when a thread passes the first check next to another thread which @@ -2374,7 +2373,7 @@ public void afterWriteOperation() { * Additionally, a flush implicitly executes a translog generation roll so if we execute a flush then we do not need to * check if we should roll the translog generation. */ - if (shouldFlush()) { + if (shouldPeriodicallyFlush()) { logger.debug("submitting async flush request"); final AbstractRunnable flush = new AbstractRunnable() { @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index b4bf6173f74cf..3cbc8fc530539 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -436,7 +436,7 @@ private long sizeInBytesByMinGen(long minGeneration) { /** * Returns the size in bytes of the translog files with ops above the given seqNo */ - private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { + public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum(); @@ -523,17 +523,6 @@ public Location add(final Operation operation) throws IOException { } } - /** - * Tests whether or not the translog should be flushed. This test is based on the current size - * of the translog comparted to the configured flush threshold size. - * - * @return {@code true} if the translog should be flushed - */ - public boolean shouldFlush() { - final long size = this.uncommittedSizeInBytes(); - return size > this.indexSettings.getFlushThresholdSize().getBytes(); - } - /** * Tests whether or not the translog generation should be rolled to a new generation. This test * is based on the size of the current generation compared to the configured generation diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2a7e49aa66b61..d375790a1cc74 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,6 +47,7 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; @@ -163,6 +165,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -4439,4 +4442,37 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1))); } } + + public void testShouldPeriodicallyFlush() throws Exception { + assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); + int numDocs = between(10, 100); + for (int id = 0; id < numDocs; id++) { + final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + } + assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); + long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes()); + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); + indexSettings.updateIndexMetaData(indexMetaData); + engine.onSettingsChanged(); + assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); + engine.flush(); + assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + // Stale operations skipped by Lucene but added to translog - still able to flush + for (int id = 0; id < numDocs; id++) { + final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); + final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false)); + assertThat(result.isCreated(), equalTo(false)); + } + SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); + assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); + engine.flush(false, false); + assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index e02b6c04a89d3..601eb8e9b1d66 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -73,7 +72,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; -import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; import java.io.UncheckedIOException; @@ -332,23 +330,23 @@ public void testMaybeFlush() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); IndexShard shard = test.getShardOrNull(0); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(117 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); client().prepareIndex("test", "test", "0") .setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); - assertTrue(shard.shouldFlush()); + assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); assertEquals(2, translog.uncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); }); assertEquals(0, translog.uncommittedOperations()); translog.sync(); @@ -364,7 +362,7 @@ public void testMaybeFlush() throws Exception { assertBusy(() -> { // this is async logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), translog.uncommittedOperations(), translog.getGeneration()); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); }); assertEquals(0, translog.uncommittedOperations()); } @@ -408,7 +406,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); final String key; final boolean flush = randomBoolean(); if (flush) { @@ -423,7 +421,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { .setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE) .get(); - assertFalse(shard.shouldFlush()); + assertFalse(shard.shouldPeriodicallyFlush()); final AtomicBoolean running = new AtomicBoolean(true); final int numThreads = randomIntBetween(2, 4); final Thread[] threads = new Thread[numThreads]; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 2089c36d06bc0..69176b03942f6 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; @@ -306,4 +307,30 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { } } + /** + * This test makes sure that there is no infinite loop of flushing (the condition `shouldPeriodicallyFlush` eventually is false) + * in peer-recovery if a primary sends a fully-baked index commit. + */ + public void testShouldFlushAfterPeerRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startAll(); + int numDocs = shards.indexDocs(between(10, 100)); + final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes(); + shards.flush(); + + final IndexShard replica = shards.addReplica(); + IndexMetaData.Builder builder = IndexMetaData.builder(replica.indexSettings().getIndexMetaData()); + long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, translogSizeOnPrimary); + builder.settings(Settings.builder().put(replica.indexSettings().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b") + ); + replica.indexSettings().updateIndexMetaData(builder.build()); + replica.onSettingsChanged(); + shards.recoverReplica(replica); + // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) + assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); + assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs)); + shards.assertAllEqual(numDocs); + } + } }