Skip to content

Commit

Permalink
Fixup 8ce5068 Intermittent failures in MongoDbTest.testTailingConsume…
Browse files Browse the repository at this point in the history
…r() #2658
  • Loading branch information
ppalaga committed Jun 4, 2021
1 parent 101ff99 commit 50f5453
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ public String restartRoute(@PathParam("routeId") String routeId, @PathParam("ope
@Path("/resultsReset/{resultId}")
@Produces(MediaType.APPLICATION_JSON)
public Map getResultsAndReset(@PathParam("resultId") String resultId) {
synchronized (results) {
int size = results.get(resultId).size();
final List<Document> list = results.get(resultId);
synchronized (list) {
int size = list.size();
Document last = null;
if (!results.get(resultId).isEmpty()) {
last = results.get(resultId).get(size - 1);
results.get(resultId).clear();
if (!list.isEmpty()) {
last = list.get(size - 1);
list.clear();
}

return CollectionHelper.mapOf("size", size, "last", last);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ public void configure() {
from(String.format("mongodb:%s?database=test&collection=%s&tailTrackIncreasingField=increasing",
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_TAILING))
.process(e -> {
synchronized (results) {
results.get(COLLECTION_TAILING).add(e.getMessage().getBody(Document.class));
final List<Document> list = results.get(COLLECTION_TAILING);
synchronized (list) {
list.add(e.getMessage().getBody(Document.class));
}
});

Expand All @@ -55,17 +56,19 @@ public void configure() {
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_PERSISTENT_TAILING))
.id(COLLECTION_PERSISTENT_TAILING)
.process(e -> {
synchronized (results) {
results.get(COLLECTION_PERSISTENT_TAILING).add(e.getMessage().getBody(Document.class));
final List<Document> list = results.get(COLLECTION_PERSISTENT_TAILING);
synchronized (list) {
list.add(e.getMessage().getBody(Document.class));
}
});

from(String.format("mongodb:%s?database=test&collection=%s&consumerType=changeStreams",
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_STREAM_CHANGES))
.routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.string': 'value2'}]}}")
.process(e -> {
synchronized (results) {
results.get(COLLECTION_STREAM_CHANGES).add(e.getMessage().getBody(Document.class));
final List<Document> list = results.get(COLLECTION_STREAM_CHANGES);
synchronized (list) {
list.add(e.getMessage().getBody(Document.class));
}
});
}
Expand Down

0 comments on commit 50f5453

Please sign in to comment.