Skip to content

Commit

Permalink
Introduce DocHistoryEntry
Browse files Browse the repository at this point in the history
Today, each test has its own process for deciding whether to refresh/flush/GC
deletes after applying an operation. This change moves this decision into
generateSingleDocHistory().
  • Loading branch information
DaveCTurner committed Apr 6, 2018
1 parent d53649b commit 5d46eee
Showing 1 changed file with 60 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1384,11 +1384,26 @@ public void testVersioningCreateExistsException() throws IOException {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}

protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
boolean partialOldPrimary, long primaryTerm,
int minOpCount, int maxOpCount) {
private class DocHistoryEntry {
final Engine.Operation op;
final boolean refreshAfterOperation;
final boolean flushAfterOperation;
final boolean gcDeletesAfterOperation;

private DocHistoryEntry(Engine.Operation op, boolean refreshAfterOperation,
boolean flushAfterOperation, boolean gcDeletesAfterOperation) {
this.op = op;
this.refreshAfterOperation = refreshAfterOperation;
this.flushAfterOperation = flushAfterOperation;
this.gcDeletesAfterOperation = gcDeletesAfterOperation;
}
}

protected List<DocHistoryEntry> generateSingleDocHistory(boolean forReplica, VersionType versionType,
boolean partialOldPrimary, long primaryTerm,
int minOpCount, int maxOpCount) {
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
final List<Engine.Operation> ops = new ArrayList<>();
final List<DocHistoryEntry> ops = new ArrayList<>();
final Term id = newUid("1");
final int startWithSeqNo;
if (partialOldPrimary) {
Expand Down Expand Up @@ -1435,19 +1450,19 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis());
}
ops.add(op);
ops.add(new DocHistoryEntry(op, randomBoolean(), randomBoolean(), rarely()));
}
return ops;
}

public void testOutOfOrderDocsOnReplica() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(true,
final List<DocHistoryEntry> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, true);
}

private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
final Engine.Operation lastOp = ops.get(ops.size() - 1);
private void assertOpsOnReplica(List<DocHistoryEntry> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
final Engine.Operation lastOp = ops.get(ops.size() - 1).op;
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOp;
Expand All @@ -1458,15 +1473,16 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
}
if (shuffleOps) {
int firstOpWithSeqNo = 0;
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).op.seqNo() < 0) {
firstOpWithSeqNo++;
}
// shuffle ops but make sure legacy ops are first
shuffle(ops.subList(0, firstOpWithSeqNo), random());
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
}
boolean firstOp = true;
for (Engine.Operation op : ops) {
for (final DocHistoryEntry docHistoryEntry : ops) {
final Engine.Operation op = docHistoryEntry.op;
logger.info("performing [{}], v [{}], seq# [{}], term [{}]",
op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
Expand All @@ -1491,13 +1507,14 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
assertThat(result.getVersion(), equalTo(op.version()));
assertThat(result.hasFailure(), equalTo(false));
}
if (randomBoolean()) {
if (docHistoryEntry.refreshAfterOperation) {
engine.refresh("test");
}
if (randomBoolean()) {
if (docHistoryEntry.flushAfterOperation) {
engine.flush();
engine.refresh("test");
}
// TODO GC deletes?
firstOp = false;
}

Expand All @@ -1512,8 +1529,8 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
}

public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException {
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final List<DocHistoryEntry> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1).op;
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOp;
Expand All @@ -1535,7 +1552,7 @@ public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedEx
}
}

private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
private void concurrentlyApplyOps(List<DocHistoryEntry> ops, InternalEngine engine) throws InterruptedException {
Thread[] thread = new Thread[randomIntBetween(3, 5)];
CountDownLatch startGun = new CountDownLatch(thread.length);
AtomicInteger offset = new AtomicInteger(-1);
Expand All @@ -1550,15 +1567,21 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
int docOffset;
while ((docOffset = offset.incrementAndGet()) < ops.size()) {
try {
final Engine.Operation op = ops.get(docOffset);
final DocHistoryEntry docHistoryEntry = ops.get(docOffset);
final Engine.Operation op = docHistoryEntry.op;
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else {
engine.delete((Engine.Delete) op);
}
if ((docOffset + 1) % 4 == 0) {
if (docHistoryEntry.refreshAfterOperation) {
engine.refresh("test");
}
if (docHistoryEntry.flushAfterOperation) {
engine.flush();
engine.refresh("test");
}
// TODO GC deletes?
} catch (IOException e) {
throw new AssertionError(e);
}
Expand All @@ -1572,11 +1595,11 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
}

public void testInternalVersioningOnPrimary() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
final List<DocHistoryEntry> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
}

private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion, boolean docDeleted, InternalEngine engine)
private int assertOpsOnPrimary(List<DocHistoryEntry> ops, long currentOpVersion, boolean docDeleted, InternalEngine engine)
throws IOException {
String lastFieldValue = null;
int opsPerformed = 0;
Expand All @@ -1586,7 +1609,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
index.getAutoGeneratedIdTimestamp(), index.isRetry());
BiFunction<Long, Engine.Delete, Engine.Delete> delWithVersion = (version, delete) -> new Engine.Delete(delete.type(), delete.id(),
delete.uid(), delete.seqNo(), delete.primaryTerm(), version, delete.versionType(), delete.origin(), delete.startTime());
for (Engine.Operation op : ops) {
for (final DocHistoryEntry docHistoryEntry : ops) {
final Engine.Operation op = docHistoryEntry.op;
final boolean versionConflict = rarely();
final boolean versionedOp = versionConflict || randomBoolean();
final long conflictingVersion = docDeleted || randomBoolean() ?
Expand Down Expand Up @@ -1650,12 +1674,14 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
}
}
}
if (randomBoolean()) {

// TODO pure refresh?
if (docHistoryEntry.flushAfterOperation) {
engine.flush();
engine.refresh("test");
}

if (rarely()) {
if (docHistoryEntry.gcDeletesAfterOperation) {
// simulate GC deletes
engine.refresh("gc_simulation", Engine.SearcherScope.INTERNAL);
engine.clearDeletedTombstones();
Expand All @@ -1680,8 +1706,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values()));
nonInternalVersioning.remove(VersionType.INTERNAL);
final VersionType versionType = randomFrom(nonInternalVersioning);
final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final List<DocHistoryEntry> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20);
final Engine.Operation lastOp = ops.get(ops.size() - 1).op;
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOp;
Expand All @@ -1697,7 +1723,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
long highestOpVersion = Versions.NOT_FOUND;
long seqNo = -1;
boolean docDeleted = true;
for (Engine.Operation op : ops) {
for (final DocHistoryEntry docHistoryEntry : ops) {
final Engine.Operation op = docHistoryEntry.op;
logger.info("performing [{}], v [{}], seq# [{}], term [{}]",
op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
Expand Down Expand Up @@ -1737,13 +1764,14 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class));
}
}
if (randomBoolean()) {
if (docHistoryEntry.refreshAfterOperation) {
engine.refresh("test");
}
if (randomBoolean()) {
if (docHistoryEntry.flushAfterOperation) {
engine.flush();
engine.refresh("test");
}
// TODO GC deletes?
}

assertVisibleCount(engine, docDeleted ? 0 : 1);
Expand All @@ -1758,9 +1786,9 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
}

public void testVersioningPromotedReplica() throws IOException {
final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20);
List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1);
final List<DocHistoryEntry> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20);
List<DocHistoryEntry> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1).op;
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete;
final long finalReplicaVersion = lastReplicaOp.version();
final long finalReplicaSeqNo = lastReplicaOp.seqNo();
Expand All @@ -1779,8 +1807,8 @@ public void testVersioningPromotedReplica() throws IOException {
}

public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException {
final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final List<DocHistoryEntry> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1).op;
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOp;
Expand Down

0 comments on commit 5d46eee

Please sign in to comment.