From 5d46eee62c53542c973a6435ff556cd922634210 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 6 Apr 2018 10:10:51 +0100 Subject: [PATCH] Introduce DocHistoryEntry Today, each test has its own process for deciding whether to refresh/flush/GC deletes after applying an operation. This change moves this decision into generateSingleDocHistory(). --- .../index/engine/InternalEngineTests.java | 92 ++++++++++++------- 1 file changed, 60 insertions(+), 32 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9cdc68444ea16..38fca74272ac4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1384,11 +1384,26 @@ public void testVersioningCreateExistsException() throws IOException { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, - boolean partialOldPrimary, long primaryTerm, - int minOpCount, int maxOpCount) { + private class DocHistoryEntry { + final Engine.Operation op; + final boolean refreshAfterOperation; + final boolean flushAfterOperation; + final boolean gcDeletesAfterOperation; + + private DocHistoryEntry(Engine.Operation op, boolean refreshAfterOperation, + boolean flushAfterOperation, boolean gcDeletesAfterOperation) { + this.op = op; + this.refreshAfterOperation = refreshAfterOperation; + this.flushAfterOperation = flushAfterOperation; + this.gcDeletesAfterOperation = gcDeletesAfterOperation; + } + } + + protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, + boolean partialOldPrimary, long primaryTerm, + int minOpCount, int maxOpCount) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); - final List ops = new ArrayList<>(); + final List ops = new ArrayList<>(); final Term id = newUid("1"); final int startWithSeqNo; if (partialOldPrimary) { @@ -1435,19 +1450,19 @@ protected List generateSingleDocHistory(boolean forReplica, Ve forReplica ? REPLICA : PRIMARY, System.currentTimeMillis()); } - ops.add(op); + ops.add(new DocHistoryEntry(op, randomBoolean(), randomBoolean(), rarely())); } return ops; } public void testOutOfOrderDocsOnReplica() throws IOException { - final List ops = generateSingleDocHistory(true, + final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); assertOpsOnReplica(ops, replicaEngine, true); } - private void assertOpsOnReplica(List ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { - final Engine.Operation lastOp = ops.get(ops.size() - 1); + private void assertOpsOnReplica(List ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { + final Engine.Operation lastOp = ops.get(ops.size() - 1).op; final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1458,7 +1473,7 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } if (shuffleOps) { int firstOpWithSeqNo = 0; - while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { + while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).op.seqNo() < 0) { firstOpWithSeqNo++; } // shuffle ops but make sure legacy ops are first @@ -1466,7 +1481,8 @@ private void assertOpsOnReplica(List ops, InternalEngine repli shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); } boolean firstOp = true; - for (Engine.Operation op : ops) { + for (final DocHistoryEntry docHistoryEntry : ops) { + final Engine.Operation op = docHistoryEntry.op; logger.info("performing [{}], v [{}], seq# [{}], term [{}]", op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { @@ -1491,13 +1507,14 @@ private void assertOpsOnReplica(List ops, InternalEngine repli assertThat(result.getVersion(), equalTo(op.version())); assertThat(result.hasFailure(), equalTo(false)); } - if (randomBoolean()) { + if (docHistoryEntry.refreshAfterOperation) { engine.refresh("test"); } - if (randomBoolean()) { + if (docHistoryEntry.flushAfterOperation) { engine.flush(); engine.refresh("test"); } + // TODO GC deletes? firstOp = false; } @@ -1512,8 +1529,8 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); - final Engine.Operation lastOp = ops.get(ops.size() - 1); + final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); + final Engine.Operation lastOp = ops.get(ops.size() - 1).op; final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1535,7 +1552,7 @@ public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedEx } } - private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { + private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { Thread[] thread = new Thread[randomIntBetween(3, 5)]; CountDownLatch startGun = new CountDownLatch(thread.length); AtomicInteger offset = new AtomicInteger(-1); @@ -1550,15 +1567,21 @@ private void concurrentlyApplyOps(List ops, InternalEngine eng int docOffset; while ((docOffset = offset.incrementAndGet()) < ops.size()) { try { - final Engine.Operation op = ops.get(docOffset); + final DocHistoryEntry docHistoryEntry = ops.get(docOffset); + final Engine.Operation op = docHistoryEntry.op; if (op instanceof Engine.Index) { engine.index((Engine.Index) op); } else { engine.delete((Engine.Delete) op); } - if ((docOffset + 1) % 4 == 0) { + if (docHistoryEntry.refreshAfterOperation) { engine.refresh("test"); } + if (docHistoryEntry.flushAfterOperation) { + engine.flush(); + engine.refresh("test"); + } + // TODO GC deletes? } catch (IOException e) { throw new AssertionError(e); } @@ -1572,11 +1595,11 @@ private void concurrentlyApplyOps(List ops, InternalEngine eng } public void testInternalVersioningOnPrimary() throws IOException { - final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + final List ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); } - private int assertOpsOnPrimary(List ops, long currentOpVersion, boolean docDeleted, InternalEngine engine) + private int assertOpsOnPrimary(List ops, long currentOpVersion, boolean docDeleted, InternalEngine engine) throws IOException { String lastFieldValue = null; int opsPerformed = 0; @@ -1586,7 +1609,8 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion index.getAutoGeneratedIdTimestamp(), index.isRetry()); BiFunction delWithVersion = (version, delete) -> new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), delete.primaryTerm(), version, delete.versionType(), delete.origin(), delete.startTime()); - for (Engine.Operation op : ops) { + for (final DocHistoryEntry docHistoryEntry : ops) { + final Engine.Operation op = docHistoryEntry.op; final boolean versionConflict = rarely(); final boolean versionedOp = versionConflict || randomBoolean(); final long conflictingVersion = docDeleted || randomBoolean() ? @@ -1650,12 +1674,14 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion } } } - if (randomBoolean()) { + + // TODO pure refresh? + if (docHistoryEntry.flushAfterOperation) { engine.flush(); engine.refresh("test"); } - if (rarely()) { + if (docHistoryEntry.gcDeletesAfterOperation) { // simulate GC deletes engine.refresh("gc_simulation", Engine.SearcherScope.INTERNAL); engine.clearDeletedTombstones(); @@ -1680,8 +1706,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException { final Set nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values())); nonInternalVersioning.remove(VersionType.INTERNAL); final VersionType versionType = randomFrom(nonInternalVersioning); - final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); - final Engine.Operation lastOp = ops.get(ops.size() - 1); + final List ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); + final Engine.Operation lastOp = ops.get(ops.size() - 1).op; final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp; @@ -1697,7 +1723,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException { long highestOpVersion = Versions.NOT_FOUND; long seqNo = -1; boolean docDeleted = true; - for (Engine.Operation op : ops) { + for (final DocHistoryEntry docHistoryEntry : ops) { + final Engine.Operation op = docHistoryEntry.op; logger.info("performing [{}], v [{}], seq# [{}], term [{}]", op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { @@ -1737,13 +1764,14 @@ public void testNonInternalVersioningOnPrimary() throws IOException { assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); } } - if (randomBoolean()) { + if (docHistoryEntry.refreshAfterOperation) { engine.refresh("test"); } - if (randomBoolean()) { + if (docHistoryEntry.flushAfterOperation) { engine.flush(); engine.refresh("test"); } + // TODO GC deletes? } assertVisibleCount(engine, docDeleted ? 0 : 1); @@ -1758,9 +1786,9 @@ public void testNonInternalVersioningOnPrimary() throws IOException { } public void testVersioningPromotedReplica() throws IOException { - final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); - List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); - Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); + final List replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); + List primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); + Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1).op; final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; final long finalReplicaVersion = lastReplicaOp.version(); final long finalReplicaSeqNo = lastReplicaOp.seqNo(); @@ -1779,8 +1807,8 @@ public void testVersioningPromotedReplica() throws IOException { } public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { - final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); - final Engine.Operation lastOp = ops.get(ops.size() - 1); + final List ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); + final Engine.Operation lastOp = ops.get(ops.size() - 1).op; final String lastFieldValue; if (lastOp instanceof Engine.Index) { Engine.Index index = (Engine.Index) lastOp;