From f971e01904c334d9229e8c25e0c44c25caf6c98c Mon Sep 17 00:00:00 2001 From: JiriOndrusek Date: Tue, 25 May 2021 13:42:21 +0200 Subject: [PATCH] Quick fix - Intermittent failures in MongoDbTest.testTailingConsumer() #2658 --- .../component/mongodb/it/MongoDbResource.java | 15 +++++++++------ .../component/mongodb/it/MongoDbRoute.java | 18 +++++++++++++++--- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/integration-tests/mongodb/src/main/java/org/apache/camel/quarkus/component/mongodb/it/MongoDbResource.java b/integration-tests/mongodb/src/main/java/org/apache/camel/quarkus/component/mongodb/it/MongoDbResource.java index fd0c3f9d1dea..82039f1b0c1c 100644 --- a/integration-tests/mongodb/src/main/java/org/apache/camel/quarkus/component/mongodb/it/MongoDbResource.java +++ b/integration-tests/mongodb/src/main/java/org/apache/camel/quarkus/component/mongodb/it/MongoDbResource.java @@ -147,13 +147,16 @@ public String restartRoute(@PathParam("routeId") String routeId, @PathParam("ope @Path("/resultsReset/{resultId}") @Produces(MediaType.APPLICATION_JSON) public Map getResultsAndReset(@PathParam("resultId") String resultId) { - int size = results.get(resultId).size(); - Document last = null; - if (!results.get(resultId).isEmpty()) { - last = results.get(resultId).get(size - 1); - results.get(resultId).clear(); + synchronized (results) { + int size = results.get(resultId).size(); + Document last = null; + if (!results.get(resultId).isEmpty()) { + last = results.get(resultId).get(size - 1); + results.get(resultId).clear(); + } + + return CollectionHelper.mapOf("size", size, "last", last); } - return CollectionHelper.mapOf("size", size, "last", last); } @Path("/convertMapToDocument") diff --git a/integration-tests/mongodb/src/main/java/org/apache/camel/quarkus/component/mongodb/it/MongoDbRoute.java b/integration-tests/mongodb/src/main/java/org/apache/camel/quarkus/component/mongodb/it/MongoDbRoute.java index 2f0085c9798b..028d23c7c029 100644 --- a/integration-tests/mongodb/src/main/java/org/apache/camel/quarkus/component/mongodb/it/MongoDbRoute.java +++ b/integration-tests/mongodb/src/main/java/org/apache/camel/quarkus/component/mongodb/it/MongoDbRoute.java @@ -44,18 +44,30 @@ public class MongoDbRoute extends RouteBuilder { public void configure() { from(String.format("mongodb:%s?database=test&collection=%s&tailTrackIncreasingField=increasing", MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_TAILING)) - .process(e -> results.get(COLLECTION_TAILING).add(e.getMessage().getBody(Document.class))); + .process(e -> { + synchronized (results) { + results.get(COLLECTION_TAILING).add(e.getMessage().getBody(Document.class)); + } + }); from(String.format( "mongodb:%s?database=test&collection=%s&tailTrackIncreasingField=increasing&persistentTailTracking=true&persistentId=darwin", MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_PERSISTENT_TAILING)) .id(COLLECTION_PERSISTENT_TAILING) - .process(e -> results.get(COLLECTION_PERSISTENT_TAILING).add(e.getMessage().getBody(Document.class))); + .process(e -> { + synchronized (results) { + results.get(COLLECTION_PERSISTENT_TAILING).add(e.getMessage().getBody(Document.class)); + } + }); from(String.format("mongodb:%s?database=test&collection=%s&consumerType=changeStreams", MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_STREAM_CHANGES)) .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.string': 'value2'}]}}") - .process(e -> results.get(COLLECTION_STREAM_CHANGES).add(e.getMessage().getBody(Document.class))); + .process(e -> { + synchronized (results) { + results.get(COLLECTION_STREAM_CHANGES).add(e.getMessage().getBody(Document.class)); + } + }); } @Produces