Skip to content

Commit

Permalink
Merge pull request #23204: Properly close Spark (streaming) context i…
Browse files Browse the repository at this point in the history
…f Pipeline translation fails
  • Loading branch information
aromanenko-dev authored Sep 13, 2022
2 parents d940595 + d635d44 commit bee135e
Showing 1 changed file with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,28 @@ public JavaStreamingContext call() throws Exception {
Duration batchDuration = new Duration(options.getBatchIntervalMillis());
LOG.info("Setting Spark streaming batchDuration to {} msec", batchDuration.milliseconds());

JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
JavaSparkContext sparkCtx = SparkContextFactory.getSparkContext(options);
JavaStreamingContext streamingCtx = new JavaStreamingContext(sparkCtx, batchDuration);

// We must first init accumulators since translators expect them to be instantiated.
SparkRunner.initAccumulators(options, jsc);
SparkRunner.initAccumulators(options, sparkCtx);
// do not need to create a MetricsPusher instance here because if is called in SparkRunner.run()

EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
EvaluationContext evalCtx = new EvaluationContext(sparkCtx, pipeline, options, streamingCtx);
// update cache candidates
SparkRunner.updateCacheCandidates(pipeline, translator, ctxt);
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
ctxt.computeOutputs();
SparkRunner.updateCacheCandidates(pipeline, translator, evalCtx);
try {
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, evalCtx));
} catch (RuntimeException e) {
streamingCtx.stop(false, false);
SparkContextFactory.stopSparkContext(sparkCtx);
throw e;
}
evalCtx.computeOutputs();

checkpoint(jssc, checkpointDir);
checkpoint(streamingCtx, checkpointDir);

return jssc;
return streamingCtx;
}

private void checkpoint(JavaStreamingContext jssc, CheckpointDir checkpointDir) {
Expand Down

0 comments on commit bee135e

Please sign in to comment.