Skip to content

Commit

Permalink
Quick fix - Intermittent failures in MongoDbTest.testTailingConsumer()
Browse files Browse the repository at this point in the history
  • Loading branch information
JiriOndrusek committed May 25, 2021
1 parent 1a95c39 commit f971e01
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,16 @@ public String restartRoute(@PathParam("routeId") String routeId, @PathParam("ope
@Path("/resultsReset/{resultId}")
@Produces(MediaType.APPLICATION_JSON)
public Map getResultsAndReset(@PathParam("resultId") String resultId) {
int size = results.get(resultId).size();
Document last = null;
if (!results.get(resultId).isEmpty()) {
last = results.get(resultId).get(size - 1);
results.get(resultId).clear();
synchronized (results) {
int size = results.get(resultId).size();
Document last = null;
if (!results.get(resultId).isEmpty()) {
last = results.get(resultId).get(size - 1);
results.get(resultId).clear();
}

return CollectionHelper.mapOf("size", size, "last", last);
}
return CollectionHelper.mapOf("size", size, "last", last);
}

@Path("/convertMapToDocument")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,30 @@ public class MongoDbRoute extends RouteBuilder {
public void configure() {
from(String.format("mongodb:%s?database=test&collection=%s&tailTrackIncreasingField=increasing",
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_TAILING))
.process(e -> results.get(COLLECTION_TAILING).add(e.getMessage().getBody(Document.class)));
.process(e -> {
synchronized (results) {
results.get(COLLECTION_TAILING).add(e.getMessage().getBody(Document.class));
}
});

from(String.format(
"mongodb:%s?database=test&collection=%s&tailTrackIncreasingField=increasing&persistentTailTracking=true&persistentId=darwin",
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_PERSISTENT_TAILING))
.id(COLLECTION_PERSISTENT_TAILING)
.process(e -> results.get(COLLECTION_PERSISTENT_TAILING).add(e.getMessage().getBody(Document.class)));
.process(e -> {
synchronized (results) {
results.get(COLLECTION_PERSISTENT_TAILING).add(e.getMessage().getBody(Document.class));
}
});

from(String.format("mongodb:%s?database=test&collection=%s&consumerType=changeStreams",
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_STREAM_CHANGES))
.routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.string': 'value2'}]}}")
.process(e -> results.get(COLLECTION_STREAM_CHANGES).add(e.getMessage().getBody(Document.class)));
.process(e -> {
synchronized (results) {
results.get(COLLECTION_STREAM_CHANGES).add(e.getMessage().getBody(Document.class));
}
});
}

@Produces
Expand Down

0 comments on commit f971e01

Please sign in to comment.