From 27c29e1f7236a7405b70353989fb6d1b1f19cbff Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Mon, 3 Apr 2017 16:50:51 +0200 Subject: [PATCH] Improve queries tests Fix Runner categories in tests Add streaming unit tests and corresponding labels issue #37 Update numEvents: results are no more linked to the number of events issue #22 --- .../src/main/resources/log4j.properties | 2 +- .../nexmark/queries/QueryTest.java | 142 ++++++++++++++---- 2 files changed, 110 insertions(+), 34 deletions(-) diff --git a/integration/java/nexmark/src/main/resources/log4j.properties b/integration/java/nexmark/src/main/resources/log4j.properties index 30d0a9df7f73..7dd57b542f86 100644 --- a/integration/java/nexmark/src/main/resources/log4j.properties +++ b/integration/java/nexmark/src/main/resources/log4j.properties @@ -27,7 +27,7 @@ log4j.logger.org.apache.beam.runners.direct=WARN log4j.logger.org.apache.beam.sdk=WARN # Nexmark specific -log4j.logger.org.apache.beam.integration.nexmark=ALL +log4j.logger.org.apache.beam.integration.nexmark=WARN # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=WARN diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java index dca2887bc000..284aa7e4f2c9 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java @@ -23,6 +23,7 @@ import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesStatefulParDo; @@ -35,81 +36,156 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Test the various NEXMark queries yield results coherent with their models. - */ +/** Test the various NEXMark queries yield results coherent with their models. */ @RunWith(JUnit4.class) public class QueryTest { private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone(); - @Rule - public TestPipeline p = TestPipeline.create(); static { - //careful, results of tests are linked to numEvents value + // careful, results of tests are linked to numEventGenerators because of timestamp generation CONFIG.numEventGenerators = 1; - CONFIG.numEvents = 100; + CONFIG.numEvents = 1000; } + @Rule public TestPipeline p = TestPipeline.create(); + /** Test {@code query} matches {@code model}. */ - private void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) { + private void queryMatchesModel( + String name, NexmarkQuery query, NexmarkQueryModel model, boolean streamingMode) { NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p); - PCollection> results = - p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query); - //TODO Ismael this should not be called explicitly - results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); + PCollection> results; + if (streamingMode) { + results = + p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query); + //TODO Ismael this should not be called explicitly + results.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + } else { + results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query); + //TODO Ismael this should not be called explicitly + results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); + } PAssert.that(results).satisfies(model.assertionFor()); PipelineResult result = p.run(); result.waitUntilFinish(); } @Test - public void query0MatchesModel() { - queryMatchesModel("Query0Test", new Query0(CONFIG), new Query0Model(CONFIG)); + @Category(NeedsRunner.class) + public void query0MatchesModelBatch() { + queryMatchesModel("Query0TestBatch", new Query0(CONFIG), new Query0Model(CONFIG), false); + } + + @Test + @Category(NeedsRunner.class) + public void query0MatchesModelStreaming() { + queryMatchesModel("Query0TestStreaming", new Query0(CONFIG), new Query0Model(CONFIG), true); + } + + @Test + @Category(NeedsRunner.class) + public void query1MatchesModelBatch() { + queryMatchesModel("Query1TestBatch", new Query1(CONFIG), new Query1Model(CONFIG), false); + } + + @Test + @Category(NeedsRunner.class) + public void query1MatchesModelStreaming() { + queryMatchesModel("Query1TestStreaming", new Query1(CONFIG), new Query1Model(CONFIG), true); + } + + @Test + @Category(NeedsRunner.class) + public void query2MatchesModelBatch() { + queryMatchesModel("Query2TestBatch", new Query2(CONFIG), new Query2Model(CONFIG), false); + } + + @Test + @Category(NeedsRunner.class) + public void query2MatchesModelStreaming() { + queryMatchesModel("Query2TestStreaming", new Query2(CONFIG), new Query2Model(CONFIG), true); + } + + @Test + @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class}) + public void query3MatchesModelBatch() { + queryMatchesModel("Query3TestBatch", new Query3(CONFIG), new Query3Model(CONFIG), false); + } + + @Test + @Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class}) + public void query3MatchesModelStreaming() { + queryMatchesModel("Query3TestStreaming", new Query3(CONFIG), new Query3Model(CONFIG), true); + } + + @Test + @Category(NeedsRunner.class) + public void query4MatchesModelBatch() { + queryMatchesModel("Query4TestBatch", new Query4(CONFIG), new Query4Model(CONFIG), false); + } + + @Test + @Category(NeedsRunner.class) + public void query4MatchesModelStreaming() { + queryMatchesModel("Query4TestStreaming", new Query4(CONFIG), new Query4Model(CONFIG), true); + } + + @Test + @Category(NeedsRunner.class) + public void query5MatchesModelBatch() { + queryMatchesModel("Query5TestBatch", new Query5(CONFIG), new Query5Model(CONFIG), false); } @Test - public void query1MatchesModel() { - queryMatchesModel("Query1Test", new Query1(CONFIG), new Query1Model(CONFIG)); + @Category(NeedsRunner.class) + public void query5MatchesModelStreaming() { + queryMatchesModel("Query5TestStreaming", new Query5(CONFIG), new Query5Model(CONFIG), true); } @Test - public void query2MatchesModel() { - queryMatchesModel("Query2Test", new Query2(CONFIG), new Query2Model(CONFIG)); + @Category(NeedsRunner.class) + public void query6MatchesModelBatch() { + queryMatchesModel("Query6TestBatch", new Query6(CONFIG), new Query6Model(CONFIG), false); } @Test - public void query3MatchesModel() { - queryMatchesModel("Query3Test", new Query3(CONFIG), new Query3Model(CONFIG)); + @Category(NeedsRunner.class) + public void query6MatchesModelStreaming() { + queryMatchesModel("Query6TestStreaming", new Query6(CONFIG), new Query6Model(CONFIG), true); } @Test - public void query4MatchesModel() { - queryMatchesModel("Query4Test", new Query4(CONFIG), new Query4Model(CONFIG)); + @Category(NeedsRunner.class) + public void query7MatchesModelBatch() { + queryMatchesModel("Query7TestBatch", new Query7(CONFIG), new Query7Model(CONFIG), false); } @Test - public void query5MatchesModel() { - queryMatchesModel("Query5Test", new Query5(CONFIG), new Query5Model(CONFIG)); + @Category(NeedsRunner.class) + public void query7MatchesModelStreaming() { + queryMatchesModel("Query7TestStreaming", new Query7(CONFIG), new Query7Model(CONFIG), true); } @Test - public void query6MatchesModel() { - queryMatchesModel("Query6Test", new Query6(CONFIG), new Query6Model(CONFIG)); + @Category(NeedsRunner.class) + public void query8MatchesModelBatch() { + queryMatchesModel("Query8TestBatch", new Query8(CONFIG), new Query8Model(CONFIG), false); } @Test - @Category({UsesStatefulParDo.class, UsesTimersInParDo.class}) - public void query7MatchesModel() { - queryMatchesModel("Query7Test", new Query7(CONFIG), new Query7Model(CONFIG)); + @Category(NeedsRunner.class) + public void query8MatchesModelStreaming() { + queryMatchesModel("Query8TestStreaming", new Query8(CONFIG), new Query8Model(CONFIG), true); } @Test - public void query8MatchesModel() { - queryMatchesModel("Query8Test", new Query8(CONFIG), new Query8Model(CONFIG)); + @Category(NeedsRunner.class) + public void query9MatchesModelBatch() { + queryMatchesModel("Query9TestBatch", new Query9(CONFIG), new Query9Model(CONFIG), false); } @Test - public void query9MatchesModel() { - queryMatchesModel("Query9Test", new Query9(CONFIG), new Query9Model(CONFIG)); + @Category(NeedsRunner.class) + public void query9MatchesModelStreaming() { + queryMatchesModel("Query9TestStreaming", new Query9(CONFIG), new Query9Model(CONFIG), true); } }