Skip to content

Commit

Permalink
Activate monitoring on NexmarkFlinkRunner
Browse files Browse the repository at this point in the history
issue apache#28

Fix compilation issue after rebase + make checkstyle happy again
  • Loading branch information
echauchot committed May 9, 2017
1 parent 319f7fc commit 3406101
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.sdk.PipelineResult;

/**
* Run a query using the Apex runner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
*/
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;

/**
* Run a single query using the Direct Runner.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*/
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkRunnerResult;
import org.apache.beam.sdk.PipelineResult;

/**
* Run a query using the Flink runner.
*/
Expand All @@ -42,7 +38,7 @@ protected int maxNumWorkers() {

@Override
protected boolean canMonitor() {
return false;
return true;
}

@Override
Expand All @@ -56,12 +52,6 @@ protected void waitForPublisherPreload() {
throw new UnsupportedOperationException();
}

@Override
@Nullable
protected NexmarkPerf monitor(NexmarkQuery query) {
return null;
}

public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
super(options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
*/
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected boolean canMonitor() {

@Override
protected String getJobId(PipelineResult job) {
return ((DataflowPipelineJob)job).getJobId();
return ((DataflowPipelineJob) job).getJobId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;

import static org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;

/**
* Run a single Nexmark query using a given configuration.
*/
Expand Down Expand Up @@ -203,7 +201,8 @@ protected long getTimestamp(
* Find a 'steady state' events/sec from {@code snapshots} and
* store it in {@code perf} if found.
*/
protected void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
protected void captureSteadyState(NexmarkPerf perf,
List<NexmarkPerf.ProgressSnapshot> snapshots) {
if (!options.isStreaming()) {
return;
}
Expand Down Expand Up @@ -365,7 +364,9 @@ private NexmarkPerf currentPerf(
return perf;
}

String getJobId(PipelineResult job){return "";}
String getJobId(PipelineResult job) {
return "";
}

// TODO specific to dataflow, see if we can find an equivalent
/*
Expand Down Expand Up @@ -926,8 +927,8 @@ private void sinkResultsToBigQuery(
new TableFieldSchema().setName("index").setType("INTEGER"),
new TableFieldSchema().setName("value").setType("STRING")))));
NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
BigQueryIO.Write.Bound io =
BigQueryIO.Write.to(tableSpec)
BigQueryIO.Write io =
BigQueryIO.write().to(tableSpec)
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark;

import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

/**
Expand All @@ -39,7 +38,8 @@ public static void main(String[] args) {
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(NexmarkSparkOptions.class);
options.setRunner(SparkRunner.class);
// options.setRunner(org.apache.beam.runners.spark.SparkRunner.class);
options.setRunner(org.apache.beam.runners.spark.SparkRunnerDebugger.class);
NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
new NexmarkSparkDriver().runAll(options, runner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*/
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.sdk.PipelineResult;

/**
* Run a query using the Spark runner.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -325,8 +324,8 @@ public static void console(String format, Object... args) {
* Setup pipeline with codes and some other options.
*/
public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
PipelineRunner<?> runner = p.getRunner();
//TODO Ismael check
// PipelineRunner<?> runner = p.getRunner();
// if (runner instanceof DirectRunner) {
// // Disable randomization of output since we want to check batch and streaming match the
// // model both locally and on the cloud.
Expand Down

0 comments on commit 3406101

Please sign in to comment.