diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java index e31d7bcb..30b28ef3 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java @@ -644,7 +644,7 @@ private Document commandChangeStreamPipeline(Document query, Oplog oplog, String int batchSize = (int) cursorDocument.getOrDefault("batchSize", 0); String namespace = getFullCollectionNamespace(collectionName); - Cursor cursor = oplog.createCursor(changeStreamDocument, namespace, aggregation); + Cursor cursor = oplog.createChangeStreamCursor(changeStreamDocument, namespace, aggregation); return Utils.firstBatchCursorResponse(namespace, cursor.takeDocuments(batchSize), cursor); } diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/EmptyCursor.java b/core/src/main/java/de/bwaldvogel/mongo/backend/EmptyCursor.java index 0e23751b..44258c92 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/EmptyCursor.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/EmptyCursor.java @@ -4,8 +4,9 @@ import java.util.NoSuchElementException; import de.bwaldvogel.mongo.bson.Document; +import de.bwaldvogel.mongo.oplog.OplogPosition; -public class EmptyCursor extends AbstractCursor { +public class EmptyCursor extends AbstractCursor implements TailableCursor { private static final long EMPTY_CURSOR_ID = 0L; @@ -33,4 +34,9 @@ public List takeDocuments(int numberToReturn) { public String toString() { return getClass().getSimpleName() + "()"; } + + @Override + public OplogPosition getPosition() { + return null; + } } diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/TailableCursor.java b/core/src/main/java/de/bwaldvogel/mongo/backend/TailableCursor.java new file mode 100644 index 00000000..5ea129da --- /dev/null +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/TailableCursor.java @@ -0,0 +1,9 @@ +package de.bwaldvogel.mongo.backend; + +import de.bwaldvogel.mongo.oplog.OplogPosition; + +public interface TailableCursor extends Cursor { + + OplogPosition getPosition(); + +} diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/ChangeStreamCursor.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/ChangeStreamCursor.java new file mode 100644 index 00000000..d47808a4 --- /dev/null +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/ChangeStreamCursor.java @@ -0,0 +1,143 @@ +package de.bwaldvogel.mongo.oplog; + +import java.util.List; +import java.util.stream.Collectors; + +import de.bwaldvogel.mongo.MongoBackend; +import de.bwaldvogel.mongo.backend.TailableCursor; +import de.bwaldvogel.mongo.backend.aggregation.Aggregation; +import de.bwaldvogel.mongo.bson.BsonTimestamp; +import de.bwaldvogel.mongo.bson.Document; +import de.bwaldvogel.mongo.exception.MongoServerException; + +public class ChangeStreamCursor implements TailableCursor { + + private static final String FULL_DOCUMENT = "fullDocument"; + private static final String OPERATION_TYPE = "operationType"; + private static final String CLUSTER_TIME = "clusterTime"; + private static final String DOCUMENT_KEY = "documentKey"; + + private final MongoBackend mongoBackend; + private final Document changeStreamDocument; + private final Aggregation aggregation; + private final OplogCursor oplogCursor; + + ChangeStreamCursor( + MongoBackend mongoBackend, + Document changeStreamDocument, + Aggregation aggregation, + OplogCursor oplogCursor + ) { + this.mongoBackend = mongoBackend; + this.changeStreamDocument = changeStreamDocument; + this.aggregation = aggregation; + this.oplogCursor = oplogCursor; + } + + @Override + public long getId() { + return oplogCursor.getId(); + } + + @Override + public boolean isEmpty() { + return oplogCursor.isEmpty(); + } + + @Override + public List takeDocuments(int numberToReturn) { + return aggregation.runStagesAsStream( + oplogCursor.takeDocuments(numberToReturn).stream() + .map(this::toChangeStreamResponseDocument) + ).collect(Collectors.toList()); + } + + @Override + public OplogPosition getPosition() { + return oplogCursor.getPosition(); + } + + private Document toChangeStreamResponseDocument(Document oplogDocument) { + OperationType operationType = OperationType.fromCode(oplogDocument.get("op").toString()); + Document documentKey = new Document(); + Document document = getUpdateDocument(oplogDocument); + BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(oplogDocument); + OplogPosition oplogPosition = new OplogPosition(timestamp); + switch (operationType) { + case UPDATE: + case DELETE: + documentKey = document; + break; + case INSERT: + documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID)); + break; + case COMMAND: + return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp); + default: + throw new IllegalArgumentException("Unexpected operation type: " + operationType); + } + + return new Document() + .append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString())) + .append(OPERATION_TYPE, operationType.getDescription()) + .append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType)) + .append(DOCUMENT_KEY, documentKey) + .append(CLUSTER_TIME, timestamp); + } + + private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) { + Document document = getUpdateDocument(oplogDocument); + String operationType = document.keySet().stream().findFirst().orElseThrow( + () -> new MongoServerException("Unspecified command operation type") + ); + + return new Document() + .append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString())) + .append(OPERATION_TYPE, operationType) + .append(CLUSTER_TIME, timestamp); + } + + private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) { + switch (operationType) { + case INSERT: + return getUpdateDocument(document); + case DELETE: + return null; + case UPDATE: + return lookUpUpdateDocument(changeStreamDocument, document); + } + throw new IllegalArgumentException("Invalid operation type"); + } + + private Document getUpdateDocument(Document document) { + return (Document) document.get(OplogDocumentFields.O); + } + + private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) { + Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document)); + if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) { + String namespace = (String) document.get(OplogDocumentFields.NAMESPACE); + String databaseName = namespace.split("\\.")[0]; + String collectionName = namespace.split("\\.")[1]; + return mongoBackend.resolveDatabase(databaseName) + .resolveCollection(collectionName, true) + .queryAllAsStream() + .filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID))) + .findFirst() + .orElse(deltaUpdate); + } + return deltaUpdate; + } + + private Document getDeltaUpdate(Document updateDocument) { + Document delta = new Document(); + if (updateDocument.containsKey("$set")) { + delta.appendAll((Document) updateDocument.get("$set")); + } + if (updateDocument.containsKey("$unset")) { + delta.appendAll((Document) updateDocument.get("$unset")); + } + return delta; + } + +} diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java index 5fb0be01..6b4d722e 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java @@ -1,30 +1,27 @@ package de.bwaldvogel.mongo.oplog; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import de.bwaldvogel.mongo.MongoBackend; import de.bwaldvogel.mongo.MongoCollection; -import de.bwaldvogel.mongo.backend.Cursor; import de.bwaldvogel.mongo.backend.CursorRegistry; +import de.bwaldvogel.mongo.backend.TailableCursor; import de.bwaldvogel.mongo.backend.Utils; import de.bwaldvogel.mongo.backend.aggregation.Aggregation; import de.bwaldvogel.mongo.bson.BsonTimestamp; import de.bwaldvogel.mongo.bson.Document; -import de.bwaldvogel.mongo.exception.MongoServerException; public class CollectionBackedOplog implements Oplog { private static final long ELECTION_TERM = 1L; private static final String START_AT_OPERATION_TIME = "startAtOperationTime"; - private static final String FULL_DOCUMENT = "fullDocument"; private static final String START_AFTER = "startAfter"; private static final String RESUME_AFTER = "resumeAfter"; - private static final String OPERATION_TYPE = "operationType"; - private static final String CLUSTER_TIME = "clusterTime"; - private static final String DOCUMENT_KEY = "documentKey"; private final OplogClock oplogClock; private final MongoCollection collection; @@ -83,21 +80,19 @@ public void handleDropCollection(String namespace) { collection.addDocument(toOplogDropCollection(databaseName, collectionName)); } - private Stream streamOplog(Document changeStreamDocument, OplogPosition position, Aggregation aggregation, - String namespace) { - return aggregation.runStagesAsStream(collection.queryAllAsStream() + private Stream streamOplog(OplogPosition position, String namespace) { + return collection.queryAllAsStream() .filter(document -> filterNamespace(document, namespace)) .filter(document -> { - BsonTimestamp timestamp = getOplogTimestamp(document); + BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document); OplogPosition documentOplogPosition = new OplogPosition(timestamp); return documentOplogPosition.isAfter(position); }) .sorted((o1, o2) -> { - BsonTimestamp timestamp1 = getOplogTimestamp(o1); - BsonTimestamp timestamp2 = getOplogTimestamp(o2); + BsonTimestamp timestamp1 = OplogUtils.getOplogTimestamp(o1); + BsonTimestamp timestamp2 = OplogUtils.getOplogTimestamp(o2); return timestamp1.compareTo(timestamp2); - }) - .map(document -> toChangeStreamResponseDocument(document, changeStreamDocument))); + }); } private static boolean filterNamespace(Document document, String namespace) { @@ -110,7 +105,16 @@ private static boolean filterNamespace(Document document, String namespace) { } @Override - public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) { + public OplogCursor createCursor(String namespace, OplogPosition initialOplogPosition) { + return new OplogCursor( + cursorRegistry.generateCursorId(), + position -> streamOplog(position, namespace), + initialOplogPosition + ); + } + + @Override + public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) { Document startAfter = (Document) changeStreamDocument.get(START_AFTER); Document resumeAfter = (Document) changeStreamDocument.get(RESUME_AFTER); BsonTimestamp startAtOperationTime = (BsonTimestamp) changeStreamDocument.get(START_AT_OPERATION_TIME); @@ -123,7 +127,7 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr String collectionName = Utils.getCollectionNameFromFullName(namespace); boolean resumeAfterTerminalEvent = collection.queryAllAsStream() .filter(document -> { - BsonTimestamp timestamp = getOplogTimestamp(document); + BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document); OplogPosition documentOplogPosition = new OplogPosition(timestamp); return initialOplogPosition.isAfter(documentOplogPosition.inclusive()); }) @@ -141,9 +145,9 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr initialOplogPosition = new OplogPosition(oplogClock.now()); } - Function> streamSupplier = - position -> streamOplog(changeStreamDocument, position, aggregation, namespace); - OplogCursor cursor = new OplogCursor(cursorRegistry.generateCursorId(), streamSupplier, initialOplogPosition); + OplogCursor oplogCursor = createCursor(namespace, initialOplogPosition); + ChangeStreamCursor cursor + = new ChangeStreamCursor(backend, changeStreamDocument, aggregation, oplogCursor); cursorRegistry.add(cursor); return cursor; } @@ -185,91 +189,4 @@ private boolean isOplogCollection(String namespace) { return collection.getFullName().equals(namespace); } - private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) { - switch (operationType) { - case INSERT: - return getUpdateDocument(document); - case DELETE: - return null; - case UPDATE: - return lookUpUpdateDocument(changeStreamDocument, document); - } - throw new IllegalArgumentException("Invalid operation type"); - } - - private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) { - Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document)); - if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) { - String namespace = (String) document.get(OplogDocumentFields.NAMESPACE); - String databaseName = namespace.split("\\.")[0]; - String collectionName = namespace.split("\\.")[1]; - return backend.resolveDatabase(databaseName) - .resolveCollection(collectionName, true) - .queryAllAsStream() - .filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID))) - .findFirst() - .orElse(deltaUpdate); - } - return deltaUpdate; - } - - private Document getDeltaUpdate(Document updateDocument) { - Document delta = new Document(); - if (updateDocument.containsKey("$set")) { - delta.appendAll((Document) updateDocument.get("$set")); - } - if (updateDocument.containsKey("$unset")) { - delta.appendAll((Document) updateDocument.get("$unset")); - } - return delta; - } - - private Document toChangeStreamResponseDocument(Document oplogDocument, Document changeStreamDocument) { - OperationType operationType = OperationType.fromCode(oplogDocument.get(OplogDocumentFields.OPERATION_TYPE).toString()); - Document documentKey = new Document(); - Document document = getUpdateDocument(oplogDocument); - BsonTimestamp timestamp = getOplogTimestamp(oplogDocument); - OplogPosition oplogPosition = new OplogPosition(timestamp); - switch (operationType) { - case UPDATE: - case DELETE: - documentKey = document; - break; - case INSERT: - documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID)); - break; - case COMMAND: - return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp); - default: - throw new IllegalArgumentException("Unexpected operation type: " + operationType); - } - - return new Document() - .append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString())) - .append(OPERATION_TYPE, operationType.getDescription()) - .append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType)) - .append(DOCUMENT_KEY, documentKey) - .append(CLUSTER_TIME, timestamp); - } - - private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) { - Document document = getUpdateDocument(oplogDocument); - String operationType = document.keySet().stream().findFirst().orElseThrow( - () -> new MongoServerException("Unspecified command operation type") - ); - - return new Document() - .append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString())) - .append(OPERATION_TYPE, operationType) - .append(CLUSTER_TIME, timestamp); - } - - private static BsonTimestamp getOplogTimestamp(Document document) { - return (BsonTimestamp) document.get(OplogDocumentFields.TIMESTAMP); - } - - private static Document getUpdateDocument(Document document) { - return (Document) document.get(OplogDocumentFields.O); - } - } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/InvalidateOplogCursor.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/InvalidateOplogCursor.java index ea2a0dc5..8f84ec88 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/InvalidateOplogCursor.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/InvalidateOplogCursor.java @@ -4,9 +4,10 @@ import java.util.List; import de.bwaldvogel.mongo.backend.AbstractCursor; +import de.bwaldvogel.mongo.backend.TailableCursor; import de.bwaldvogel.mongo.bson.Document; -class InvalidateOplogCursor extends AbstractCursor { +class InvalidateOplogCursor extends AbstractCursor implements TailableCursor { private final OplogPosition position; InvalidateOplogCursor(OplogPosition position) { @@ -27,4 +28,8 @@ public List takeDocuments(int numberToReturn) { return Collections.singletonList(result); } + @Override + public OplogPosition getPosition() { + return null; + } } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java index c138c0b6..2bb6b412 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java @@ -2,8 +2,8 @@ import java.util.List; -import de.bwaldvogel.mongo.backend.Cursor; import de.bwaldvogel.mongo.backend.EmptyCursor; +import de.bwaldvogel.mongo.backend.TailableCursor; import de.bwaldvogel.mongo.backend.aggregation.Aggregation; import de.bwaldvogel.mongo.bson.Document; @@ -35,7 +35,12 @@ public void handleDropCollection(String namespace) { } @Override - public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) { + public TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition) { + return EmptyCursor.get(); + } + + @Override + public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) { return EmptyCursor.get(); } } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java index e6572623..2fd39c1e 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java @@ -2,7 +2,7 @@ import java.util.List; -import de.bwaldvogel.mongo.backend.Cursor; +import de.bwaldvogel.mongo.backend.TailableCursor; import de.bwaldvogel.mongo.backend.aggregation.Aggregation; import de.bwaldvogel.mongo.bson.Document; @@ -16,5 +16,7 @@ public interface Oplog { void handleDropCollection(String namespace); - Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation); + TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition); + + TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation); } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java index a895d20a..ee2d3f28 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java @@ -1,20 +1,23 @@ package de.bwaldvogel.mongo.oplog; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import de.bwaldvogel.mongo.backend.CollectionUtils; import de.bwaldvogel.mongo.backend.AbstractCursor; +import de.bwaldvogel.mongo.backend.TailableCursor; +import de.bwaldvogel.mongo.bson.BsonTimestamp; import de.bwaldvogel.mongo.bson.Document; -public class OplogCursor extends AbstractCursor { +public class OplogCursor extends AbstractCursor implements TailableCursor { private final Function> oplogStream; private OplogPosition position; - public OplogCursor(long cursorId, Function> oplogStream, OplogPosition position) { + OplogCursor(long cursorId, Function> oplogStream, OplogPosition position) { super(cursorId); this.oplogStream = oplogStream; this.position = position; @@ -27,6 +30,8 @@ public boolean isEmpty() { @Override public List takeDocuments(int numberToReturn) { + emulateWaitingForAllShards(); + Stream stream = oplogStream.apply(position); if (numberToReturn > 0) { @@ -38,7 +43,18 @@ public List takeDocuments(int numberToReturn) { return documents; } - OplogPosition getPosition() { + private void emulateWaitingForAllShards() { + try { + // artificial delay to avoid 100% CPU usage when starting multiple ChangeStreams + // emulates real ChangeStream behaviour of waiting for all shards to provide data + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + // ignore + } + } + + @Override + public OplogPosition getPosition() { return position; } @@ -48,9 +64,9 @@ private void updatePosition(List documents) { } } - private static OplogPosition getOplogPosition(Document document) { - Document id = (Document) document.get(OplogDocumentFields.ID); - return OplogPosition.fromDocument(id); + private static OplogPosition getOplogPosition(Document oplogDocument) { + final BsonTimestamp oplogTimestamp = OplogUtils.getOplogTimestamp(oplogDocument); + return OplogPosition.fromTimestamp(oplogTimestamp); } } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogPosition.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogPosition.java index 642e2f80..69fe147e 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogPosition.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogPosition.java @@ -29,6 +29,10 @@ private static OplogPosition fromHexString(String hexString) { return new OplogPosition(Long.parseLong(hexString, 16)); } + public static OplogPosition fromTimestamp(BsonTimestamp timestamp) { + return new OplogPosition(timestamp); + } + public static OplogPosition fromDocument(Document document) { return fromHexString((String) document.get(OplogDocumentFields.ID_DATA_KEY)); } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogUtils.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogUtils.java new file mode 100644 index 00000000..26417e47 --- /dev/null +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/OplogUtils.java @@ -0,0 +1,11 @@ +package de.bwaldvogel.mongo.oplog; + +import de.bwaldvogel.mongo.bson.BsonTimestamp; +import de.bwaldvogel.mongo.bson.Document; + +class OplogUtils { + + static BsonTimestamp getOplogTimestamp(Document document) { + return (BsonTimestamp) document.get(OplogDocumentFields.TIMESTAMP); + } +} diff --git a/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractOplogTest.java b/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractOplogTest.java index 482e98aa..b4d6fff0 100755 --- a/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractOplogTest.java +++ b/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractOplogTest.java @@ -7,18 +7,23 @@ import static de.bwaldvogel.mongo.backend.TestUtils.json; import static de.bwaldvogel.mongo.backend.TestUtils.toArray; import static java.util.Collections.singletonList; +import static org.assertj.core.groups.Tuple.tuple; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import java.time.Duration; import java.time.Instant; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.assertj.core.api.Assertions; import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonTimestamp; @@ -40,6 +45,8 @@ import com.mongodb.reactivestreams.client.Success; import de.bwaldvogel.mongo.oplog.OperationType; +import io.reactivex.Flowable; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; public abstract class AbstractOplogTest extends AbstractTest { @@ -456,4 +463,66 @@ private static T getSingleValue(TestSubscriber subscriber) { return subscriber.values().get(0); } + @Test + public void testMultipleChangeStreams() throws InterruptedException { + Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 1"))) + .test().awaitDone(5, TimeUnit.SECONDS).assertComplete(); + + final int changeStreamCount = 32; + + List pipeline = singletonList(match(Filters.eq("fullDocument.bu", "abc"))); + + final TestSubscriber>>> streamSubscriber + = new TestSubscriber<>(); + + Flowable.range(1, changeStreamCount) + .flatMapSingle(index -> { + return Flowable.fromPublisher(asyncCollection.watch(pipeline)) + .take(2) + .toList() + .map(changeStreamDocuments -> { + return new AbstractMap.SimpleEntry<>(index, changeStreamDocuments); + }) + .subscribeOn(Schedulers.io()); // subscribe to change streams concurrently + }) + .toMap(Map.Entry::getKey, Map.Entry::getValue) + .toFlowable() + .subscribe(streamSubscriber); + + // give time for all ChangeStream Publishers to be subscribed to + // todo: expose API to get cursors from Backend and wait until 'changeStreamCount' cursors + TimeUnit.SECONDS.sleep(5); + + Flowable.concat( + Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 2, bu: 'abc'"))), + Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 3, bu: 'xyz'"))), + Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 4, bu: 'abc'"))) + ).test().awaitDone(15, TimeUnit.SECONDS).assertComplete(); + + final Map>> results = streamSubscriber + .awaitDone(30, TimeUnit.SECONDS) + .assertComplete() + .assertValueCount(1) + .values().get(0); + + Assertions.assertThat(IntStream.rangeClosed(1, changeStreamCount)) + .allSatisfy(index -> { + Assertions.assertThat(results).containsKey(index); + + final List> emits = results.get(index); + Assertions.assertThat(emits).isNotNull() + .extracting( + document -> { + return document.getDocumentKey().getInt32("_id").getValue(); + }, + document -> { + return document.getFullDocument() != null + ? document.getFullDocument().getString("bu") + : null; + } + ) + .containsExactly(tuple(2, "abc"), tuple(4, "abc")); + }); + } + }