Skip to content

Commit

Permalink
Intermittent failures in MongoDbTest.testTailingConsumer() and MongoD…
Browse files Browse the repository at this point in the history
…bTest.testPersistentTailingConsumer() apache#2658
  • Loading branch information
JiriOndrusek committed Jun 8, 2021
1 parent 3ce5294 commit ea64347
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,31 @@ public String restartRoute(@PathParam("routeId") String routeId, @PathParam("ope
}

@GET
@Path("/resultsReset/{resultId}")
@Path("/results/{resultId}")
@Produces(MediaType.APPLICATION_JSON)
public Map getResultsAndReset(@PathParam("resultId") String resultId) {
public Map getResults(@PathParam("resultId") String resultId) {
final List<Document> list = results.get(resultId);
synchronized (list) {
int size = list.size();
Document last = null;
if (!list.isEmpty()) {
last = list.get(size - 1);
list.clear();
}
return CollectionHelper.mapOf("size", size, "last", last);
}
}

@GET
@Path("/resultsReset/{resultId}")
public void resetResults(@PathParam("resultId") String resultId) {
final List<Document> list = results.get(resultId);
synchronized (list) {
if (!list.isEmpty()) {
list.clear();
}
}
}

@Path("/convertMapToDocument")
@POST
@Consumes(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.apache.camel.quarkus.component.mongodb.it;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
Expand Down Expand Up @@ -78,9 +78,9 @@ public void configure() {
@Named("results")
Map<String, List<Document>> results() {
Map<String, List<Document>> result = new HashMap<>();
result.put(COLLECTION_TAILING, new CopyOnWriteArrayList<>());
result.put(COLLECTION_PERSISTENT_TAILING, new CopyOnWriteArrayList<>());
result.put(COLLECTION_STREAM_CHANGES, new CopyOnWriteArrayList<>());
result.put(COLLECTION_TAILING, new LinkedList<>());
result.put(COLLECTION_PERSISTENT_TAILING, new LinkedList<>());
result.put(COLLECTION_STREAM_CHANGES, new LinkedList<>());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -143,7 +142,6 @@ public void testDynamicOperation() {

}

@Disabled("https://github.com/apache/camel-quarkus/issues/2658")
@Test
public void testTailingConsumer() throws Exception {
MongoCollection collection = db.getCollection(COLLECTION_TAILING, Document.class);
Expand All @@ -153,12 +151,11 @@ public void testTailingConsumer() throws Exception {

//verify continuously
if (i % CAP_NUMBER == 0) {
waitForTailingResults(CAP_NUMBER, "value" + i, COLLECTION_TAILING);
waitAndResetTailingResults(CAP_NUMBER, "value" + i, COLLECTION_TAILING);
}
}
}

@Disabled("https://github.com/apache/camel-quarkus/issues/2658")
@Test
public void testPersistentTailingConsumer() throws Exception {
MongoCollection collection = db.getCollection(COLLECTION_PERSISTENT_TAILING, Document.class);
Expand All @@ -168,7 +165,7 @@ public void testPersistentTailingConsumer() throws Exception {

//verify continuously
if (i % CAP_NUMBER == 0) {
waitForTailingResults(CAP_NUMBER, "value" + i, COLLECTION_PERSISTENT_TAILING);
waitAndResetTailingResults(CAP_NUMBER, "value" + i, COLLECTION_PERSISTENT_TAILING);
}
}

Expand All @@ -183,7 +180,7 @@ public void testPersistentTailingConsumer() throws Exception {

//verify continuously
if (i % CAP_NUMBER == 0) {
waitForTailingResults(CAP_NUMBER, "value" + i, COLLECTION_PERSISTENT_TAILING);
waitAndResetTailingResults(CAP_NUMBER, "value" + i, COLLECTION_PERSISTENT_TAILING);
}
}
}
Expand Down Expand Up @@ -214,7 +211,7 @@ public void testStreamConsumerWithFilter() throws Exception {
collection.insertOne(new Document("increasing", i).append("string", "value" + i));
}

waitForTailingResults(1, "value2", COLLECTION_STREAM_CHANGES);
waitAndResetTailingResults(1, "value2", COLLECTION_STREAM_CHANGES);
}

@Test
Expand Down Expand Up @@ -279,14 +276,20 @@ public void testOutputTypeDocument() throws Exception {

}

private void waitForTailingResults(int expectedSize, String laststring, String resultId) {
private void waitAndResetTailingResults(int expectedSize, String laststring, String resultId) {
await().atMost(5, TimeUnit.SECONDS).until(
() -> RestAssured
.given().contentType(ContentType.JSON)
.get("/mongodb/resultsReset/" + resultId)
.get("/mongodb/results/" + resultId)
.then()
.statusCode(200)
.extract().as(Map.class),
m -> ((int) m.get("size") == expectedSize && laststring.equals(((Map) m.get("last")).get("string"))));

RestAssured
.given().contentType(ContentType.JSON)
.get("/mongodb/resultsReset/" + resultId)
.then()
.statusCode(204);
}
}

0 comments on commit ea64347

Please sign in to comment.