Skip to content

Commit

Permalink
Improve queries tests
Browse files Browse the repository at this point in the history
Fix Runner categories in tests

Add streaming unit tests and corresponding labels
issue apache#37

Update numEvents: results are no more linked to the number of events

issue apache#22
  • Loading branch information
echauchot committed Jun 16, 2017
1 parent 7714618 commit 27c29e1
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ log4j.logger.org.apache.beam.runners.direct=WARN
log4j.logger.org.apache.beam.sdk=WARN

# Nexmark specific
log4j.logger.org.apache.beam.integration.nexmark=ALL
log4j.logger.org.apache.beam.integration.nexmark=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
Expand All @@ -35,81 +36,156 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Test the various NEXMark queries yield results coherent with their models.
*/
/** Test the various NEXMark queries yield results coherent with their models. */
@RunWith(JUnit4.class)
public class QueryTest {
private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone();
@Rule
public TestPipeline p = TestPipeline.create();

static {
//careful, results of tests are linked to numEvents value
// careful, results of tests are linked to numEventGenerators because of timestamp generation
CONFIG.numEventGenerators = 1;
CONFIG.numEvents = 100;
CONFIG.numEvents = 1000;
}

@Rule public TestPipeline p = TestPipeline.create();

/** Test {@code query} matches {@code model}. */
private void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) {
private void queryMatchesModel(
String name, NexmarkQuery query, NexmarkQueryModel model, boolean streamingMode) {
NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
PCollection<TimestampedValue<KnownSize>> results =
p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
//TODO Ismael this should not be called explicitly
results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
PCollection<TimestampedValue<KnownSize>> results;
if (streamingMode) {
results =
p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query);
//TODO Ismael this should not be called explicitly
results.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
} else {
results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
//TODO Ismael this should not be called explicitly
results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
}
PAssert.that(results).satisfies(model.assertionFor());
PipelineResult result = p.run();
result.waitUntilFinish();
}

@Test
public void query0MatchesModel() {
queryMatchesModel("Query0Test", new Query0(CONFIG), new Query0Model(CONFIG));
@Category(NeedsRunner.class)
public void query0MatchesModelBatch() {
queryMatchesModel("Query0TestBatch", new Query0(CONFIG), new Query0Model(CONFIG), false);
}

@Test
@Category(NeedsRunner.class)
public void query0MatchesModelStreaming() {
queryMatchesModel("Query0TestStreaming", new Query0(CONFIG), new Query0Model(CONFIG), true);
}

@Test
@Category(NeedsRunner.class)
public void query1MatchesModelBatch() {
queryMatchesModel("Query1TestBatch", new Query1(CONFIG), new Query1Model(CONFIG), false);
}

@Test
@Category(NeedsRunner.class)
public void query1MatchesModelStreaming() {
queryMatchesModel("Query1TestStreaming", new Query1(CONFIG), new Query1Model(CONFIG), true);
}

@Test
@Category(NeedsRunner.class)
public void query2MatchesModelBatch() {
queryMatchesModel("Query2TestBatch", new Query2(CONFIG), new Query2Model(CONFIG), false);
}

@Test
@Category(NeedsRunner.class)
public void query2MatchesModelStreaming() {
queryMatchesModel("Query2TestStreaming", new Query2(CONFIG), new Query2Model(CONFIG), true);
}

@Test
@Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
public void query3MatchesModelBatch() {
queryMatchesModel("Query3TestBatch", new Query3(CONFIG), new Query3Model(CONFIG), false);
}

@Test
@Category({NeedsRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
public void query3MatchesModelStreaming() {
queryMatchesModel("Query3TestStreaming", new Query3(CONFIG), new Query3Model(CONFIG), true);
}

@Test
@Category(NeedsRunner.class)
public void query4MatchesModelBatch() {
queryMatchesModel("Query4TestBatch", new Query4(CONFIG), new Query4Model(CONFIG), false);
}

@Test
@Category(NeedsRunner.class)
public void query4MatchesModelStreaming() {
queryMatchesModel("Query4TestStreaming", new Query4(CONFIG), new Query4Model(CONFIG), true);
}

@Test
@Category(NeedsRunner.class)
public void query5MatchesModelBatch() {
queryMatchesModel("Query5TestBatch", new Query5(CONFIG), new Query5Model(CONFIG), false);
}

@Test
public void query1MatchesModel() {
queryMatchesModel("Query1Test", new Query1(CONFIG), new Query1Model(CONFIG));
@Category(NeedsRunner.class)
public void query5MatchesModelStreaming() {
queryMatchesModel("Query5TestStreaming", new Query5(CONFIG), new Query5Model(CONFIG), true);
}

@Test
public void query2MatchesModel() {
queryMatchesModel("Query2Test", new Query2(CONFIG), new Query2Model(CONFIG));
@Category(NeedsRunner.class)
public void query6MatchesModelBatch() {
queryMatchesModel("Query6TestBatch", new Query6(CONFIG), new Query6Model(CONFIG), false);
}

@Test
public void query3MatchesModel() {
queryMatchesModel("Query3Test", new Query3(CONFIG), new Query3Model(CONFIG));
@Category(NeedsRunner.class)
public void query6MatchesModelStreaming() {
queryMatchesModel("Query6TestStreaming", new Query6(CONFIG), new Query6Model(CONFIG), true);
}

@Test
public void query4MatchesModel() {
queryMatchesModel("Query4Test", new Query4(CONFIG), new Query4Model(CONFIG));
@Category(NeedsRunner.class)
public void query7MatchesModelBatch() {
queryMatchesModel("Query7TestBatch", new Query7(CONFIG), new Query7Model(CONFIG), false);
}

@Test
public void query5MatchesModel() {
queryMatchesModel("Query5Test", new Query5(CONFIG), new Query5Model(CONFIG));
@Category(NeedsRunner.class)
public void query7MatchesModelStreaming() {
queryMatchesModel("Query7TestStreaming", new Query7(CONFIG), new Query7Model(CONFIG), true);
}

@Test
public void query6MatchesModel() {
queryMatchesModel("Query6Test", new Query6(CONFIG), new Query6Model(CONFIG));
@Category(NeedsRunner.class)
public void query8MatchesModelBatch() {
queryMatchesModel("Query8TestBatch", new Query8(CONFIG), new Query8Model(CONFIG), false);
}

@Test
@Category({UsesStatefulParDo.class, UsesTimersInParDo.class})
public void query7MatchesModel() {
queryMatchesModel("Query7Test", new Query7(CONFIG), new Query7Model(CONFIG));
@Category(NeedsRunner.class)
public void query8MatchesModelStreaming() {
queryMatchesModel("Query8TestStreaming", new Query8(CONFIG), new Query8Model(CONFIG), true);
}

@Test
public void query8MatchesModel() {
queryMatchesModel("Query8Test", new Query8(CONFIG), new Query8Model(CONFIG));
@Category(NeedsRunner.class)
public void query9MatchesModelBatch() {
queryMatchesModel("Query9TestBatch", new Query9(CONFIG), new Query9Model(CONFIG), false);
}

@Test
public void query9MatchesModel() {
queryMatchesModel("Query9Test", new Query9(CONFIG), new Query9Model(CONFIG));
@Category(NeedsRunner.class)
public void query9MatchesModelStreaming() {
queryMatchesModel("Query9TestStreaming", new Query9(CONFIG), new Query9Model(CONFIG), true);
}
}

0 comments on commit 27c29e1

Please sign in to comment.