diff --git a/cdap-api-common/pom.xml b/cdap-api-common/pom.xml index ada8e8f44227..59b7adede6e5 100644 --- a/cdap-api-common/pom.xml +++ b/cdap-api-common/pom.xml @@ -43,6 +43,12 @@ junit junit + + io.cdap.cdap + cdap-error-api + 6.11.0-SNAPSHOT + compile + diff --git a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedException.java b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedException.java new file mode 100644 index 000000000000..e947cbf0ee2e --- /dev/null +++ b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedException.java @@ -0,0 +1,58 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.api.exception; + +import io.cdap.cdap.error.api.ErrorTagProvider; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Sets the stage name in exception. + */ +public class WrappedException extends RuntimeException implements ErrorTagProvider { + + private final String stageName; + private final Set errorTags = new HashSet<>(); + + public WrappedException(Throwable cause, String stageName) { + super(cause); + this.stageName = stageName; + this.errorTags.add(ErrorTag.PLUGIN); + } + + public WrappedException(String message, String stageName) { + super(message); + this.stageName = stageName; + this.errorTags.add(ErrorTag.PLUGIN); + } + + public WrappedException(Throwable cause, String message, String stageName) { + super(message, cause); + this.stageName = stageName; + this.errorTags.add(ErrorTag.PLUGIN); + } + + public String getStageName() { + return stageName; + } + + @Override + public Set getErrorTags() { + return Collections.unmodifiableSet(errorTags); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowProgramController.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowProgramController.java index bd2b44807ed7..1eaeadc3845b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowProgramController.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowProgramController.java @@ -15,15 +15,19 @@ */ package io.cdap.cdap.internal.app.runtime.workflow; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.Service; +import io.cdap.cdap.api.exception.WrappedException; import io.cdap.cdap.internal.app.runtime.AbstractProgramController; import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.proto.id.ProgramRunId; +import java.util.List; import org.apache.twill.api.RunId; import org.apache.twill.common.Threads; import org.apache.twill.internal.ServiceListenerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; /** * @@ -87,6 +91,15 @@ public void terminated(Service.State from) { @Override public void failed(Service.State from, Throwable failure) { + List causalChain = Throwables.getCausalChain(failure); + for(Throwable cause : causalChain) { + if (cause instanceof WrappedException) { + String stageName = ((WrappedException) cause).getStageName(); + LOG.error("Stage: {}", stageName); + MDC.put("Failed_Stage", stageName); + break; + } + } LOG.error("Workflow service '{}' failed.", serviceName, failure); error(failure); } diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedInputFormat.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedInputFormat.java new file mode 100644 index 000000000000..f368e865be57 --- /dev/null +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedInputFormat.java @@ -0,0 +1,86 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.batch; + +import io.cdap.cdap.api.exception.WrappedException; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class WrappedInputFormat extends InputFormat { + private final InputFormat inputFormat; + private final String stageName; + + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + try { + return inputFormat.getSplits(jobContext); + } catch (Exception e) { + if (stageName != null) { + throw new WrappedException(e, stageName); + } + throw e; + } + } + + @Override + public RecordReader createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + try { + return new WrappedRecordReader<>(inputFormat.createRecordReader(inputSplit, + taskAttemptContext), stageName); + } catch (Exception e) { + if (stageName != null) { + throw new WrappedException(e, stageName); + } + throw e; + } + } + + /** + * Returns the delegating {@link InputFormat} based on the current configuration. + * + * @param classLoader the {@link ClassLoader} for loading input format + * @param inputFormatClassName the name of {@link InputFormat} class + * @throws IOException if failed to instantiate the input format class + */ + public WrappedInputFormat(ClassLoader classLoader, String inputFormatClassName, + String stageName) throws IOException { + this.stageName = stageName; + if (inputFormatClassName == null) { + throw new IllegalArgumentException("Missing configuration for the InputFormat to use"); + } + if (inputFormatClassName.equals(getClass().getName())) { + throw new IllegalArgumentException("Cannot delegate InputFormat to the same class"); + } + try { + //noinspection unchecked + @SuppressWarnings("unchecked") + Class> inputFormatClass = (Class>) classLoader.loadClass( + inputFormatClassName); + this.inputFormat = inputFormatClass.newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new IOException( + String.format("Unable to instantiate delegate input format %s", inputFormatClassName), e); + } + } +} diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedRecordReader.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedRecordReader.java new file mode 100644 index 000000000000..36379afa8b73 --- /dev/null +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedRecordReader.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.batch; + +import io.cdap.cdap.api.exception.WrappedException; +import java.io.IOException; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class WrappedRecordReader extends RecordReader { + private final RecordReader recordReader; + private final String stageName; + + public WrappedRecordReader(RecordReader recordReader, String stageName) { + this.recordReader = recordReader; + this.stageName = stageName; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + try { + recordReader.initialize(inputSplit, taskAttemptContext); + } catch (Exception e) { + if (stageName != null) { + throw new WrappedException(e, stageName); + } + throw e; + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + try { + return recordReader.nextKeyValue(); + } catch (Exception e) { + if (stageName != null) { + throw new WrappedException(e, stageName); + } + throw e; + } + } + + @Override + public K getCurrentKey() throws IOException, InterruptedException { + try { + return recordReader.getCurrentKey(); + } catch (Exception e) { + if (stageName != null) { + throw new WrappedException(e, stageName); + } + throw e; + } + } + + @Override + public V getCurrentValue() throws IOException, InterruptedException { + try { + return recordReader.getCurrentValue(); + } catch (Exception e) { + if (stageName != null) { + throw new WrappedException(e, stageName); + } + throw e; + } + } + + @Override + public float getProgress() throws IOException, InterruptedException { + try { + return recordReader.getProgress(); + } catch (Exception e) { + if (stageName != null) { + throw new WrappedException(e, stageName); + } + throw e; + } + } + + @Override + public void close() throws IOException { + try { + recordReader.close(); + } catch (Exception e) { + if (stageName != null) { + throw new WrappedException(e, stageName); + } + throw e; + } + } +} diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/StageLoggingCaller.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/StageLoggingCaller.java index 5c7861f276fb..1a460f2f25f6 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/StageLoggingCaller.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/StageLoggingCaller.java @@ -33,6 +33,10 @@ private StageLoggingCaller(Caller delegate, String stageName) { this.stageName = stageName; } + public String getStageName() { + return stageName; + } + @Override public T call(Callable callable) throws Exception { MDC.put(Constants.MDC_STAGE_KEY, stageName); diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedAction.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedAction.java index 29d1a3335e0f..891a7443b9bb 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedAction.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedAction.java @@ -16,10 +16,14 @@ package io.cdap.cdap.etl.common.plugin; +import io.cdap.cdap.api.exception.WrappedException; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; /** * Wrapper around {@link Action} that makes sure logging, classloading, and other pipeline @@ -29,6 +33,7 @@ public class WrappedAction extends Action implements PluginWrapper { private final Action action; private final Caller caller; + private static final Logger LOG = LoggerFactory.getLogger(WrappedAction.class); public WrappedAction(Action action, Caller caller) { this.action = action; @@ -45,10 +50,20 @@ public void configurePipeline(final PipelineConfigurer pipelineConfigurer) { @Override public void run(final ActionContext context) throws Exception { - caller.call((Callable) () -> { - action.run(context); - return null; - }); + try { + caller.call((Callable) () -> { + action.run(context); + return null; + }); + } catch(Exception e) { + if (caller instanceof StageLoggingCaller) { + String stageName = ((StageLoggingCaller) caller).getStageName(); + MDC.put("Failed_Stage", stageName); + LOG.error("Stage: {}", stageName); + throw new WrappedException(e, stageName); + } + throw e; + } } @Override diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSink.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSink.java index 18e84fe419b7..dd0904796c64 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSink.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSink.java @@ -17,6 +17,7 @@ package io.cdap.cdap.etl.common.plugin; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.WrappedException; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; @@ -24,6 +25,9 @@ import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.api.batch.BatchSource; import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; /** * Wrapper around {@link BatchSource} that makes sure logging, classloading, and other pipeline @@ -40,6 +44,7 @@ public class WrappedBatchSink private final BatchSink batchSink; private final Caller caller; private final OperationTimer operationTimer; + private static final Logger LOG = LoggerFactory.getLogger(WrappedBatchSink.class); public WrappedBatchSink(BatchSink batchSink, Caller caller, OperationTimer operationTimer) { @@ -73,6 +78,14 @@ public void transform(IN input, batchSink.transform(input, new UntimedEmitter<>(emitter, operationTimer)); return null; }); + } catch(Exception e) { + if (caller instanceof StageLoggingCaller) { + String stageName = ((StageLoggingCaller) caller).getStageName(); + MDC.put("Failed_Stage", stageName); + LOG.error("Stage: {}", stageName); + throw new WrappedException(e, stageName); + } + throw e; } finally { operationTimer.reset(); } diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSource.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSource.java index b2129a24c93f..0d1231934e41 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSource.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSource.java @@ -17,12 +17,16 @@ package io.cdap.cdap.etl.common.plugin; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.WrappedException; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; /** * Wrapper around {@link BatchSource} that makes sure logging, classloading, and other pipeline @@ -39,6 +43,7 @@ public class WrappedBatchSource private final BatchSource batchSource; private final Caller caller; private final OperationTimer operationTimer; + private static final Logger LOG = LoggerFactory.getLogger(WrappedBatchSource.class); public WrappedBatchSource(BatchSource batchSource, Caller caller, OperationTimer operationTimer) { @@ -71,6 +76,14 @@ public void transform(KeyValue input, Emitter emitter) thro batchSource.transform(input, new UntimedEmitter<>(emitter, operationTimer)); return null; }); + } catch(Exception e) { + if (caller instanceof StageLoggingCaller) { + String stageName = ((StageLoggingCaller) caller).getStageName(); + MDC.put("Failed_Stage", stageName); + LOG.error("Stage: {}", stageName); + throw new WrappedException(e, stageName); + } + throw e; } finally { operationTimer.reset(); } diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedTransform.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedTransform.java index ef7408d0db3c..e28ddb6f14a2 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedTransform.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedTransform.java @@ -16,12 +16,16 @@ package io.cdap.cdap.etl.common.plugin; +import io.cdap.cdap.api.exception.WrappedException; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.StageSubmitterContext; import io.cdap.cdap.etl.api.Transform; import io.cdap.cdap.etl.api.TransformContext; import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; /** * Wrapper around a {@link Transform} that makes sure logging, classloading, and other pipeline @@ -36,6 +40,7 @@ public class WrappedTransform extends Transform implements private final Transform transform; private final Caller caller; private final OperationTimer operationTimer; + private static final Logger LOG = LoggerFactory.getLogger(WrappedTransform.class); public WrappedTransform(Transform transform, Caller caller, OperationTimer operationTimer) { @@ -92,6 +97,14 @@ public void transform(IN input, Emitter emitter) throws Exception { transform.transform(input, new UntimedEmitter<>(emitter, operationTimer)); return null; }); + } catch(Exception e) { + if (caller instanceof StageLoggingCaller) { + String stageName = ((StageLoggingCaller) caller).getStageName(); + MDC.put("Failed_Stage", stageName); + LOG.error("Stage: {}", stageName); + throw new WrappedException(e, stageName); + } + throw e; } finally { operationTimer.reset(); } diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/validation/LoggingFailureCollector.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/validation/LoggingFailureCollector.java index 72b624ff1034..2d482f1c23b2 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/validation/LoggingFailureCollector.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/validation/LoggingFailureCollector.java @@ -25,6 +25,7 @@ import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; /** * Failure collector that logs the failures. @@ -32,6 +33,7 @@ public class LoggingFailureCollector extends DefaultFailureCollector { private static final Logger LOG = LoggerFactory.getLogger(LoggingFailureCollector.class); + private String stageName; /** * Failure collector that logs the failures. @@ -41,6 +43,7 @@ public class LoggingFailureCollector extends DefaultFailureCollector { */ public LoggingFailureCollector(String stageName, Map inputSchemas) { super(stageName, inputSchemas); + this.stageName = stageName; } @Override @@ -57,7 +60,9 @@ public ValidationException getOrThrowException() throws ValidationException { } List failures = validationException.getFailures(); - LOG.error("Encountered '{}' validation failures: {}{}", failures.size(), System.lineSeparator(), + MDC.put("Failed_Stage", stageName); + LOG.error("Encountered '{}' validation failures in stage {}: {}{}", stageName, failures.size(), + System.lineSeparator(), IntStream.range(0, failures.size()) .mapToObj( index -> String.format("%d. %s", index + 1, failures.get(index).getFullMessage())) diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSQLEngineAdapter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSQLEngineAdapter.java index 6349970d4f79..abab027eba00 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSQLEngineAdapter.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSQLEngineAdapter.java @@ -347,7 +347,7 @@ private BatchCollectionFactory pullInternal(SQLDataset dataset) throws SQ ClassLoader classLoader = Objects.firstNonNull(Thread.currentThread().getContextClassLoader(), getClass().getClassLoader()); JavaPairRDD pairRDD = RDDUtils.readUsingInputFormat(jsc, sqlPullDataset, classLoader, Object.class, - Object.class); + Object.class, null); JavaRDD rdd = pairRDD.flatMap(new TransformFromPairFunction(sqlPullDataset.fromKeyValue())) .map(f -> { return f; diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java index f3cc3e25e797..a7441344dd1b 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java @@ -17,6 +17,7 @@ package io.cdap.cdap.etl.spark.batch; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.SetMultimap; import com.google.gson.Gson; @@ -26,6 +27,7 @@ import io.cdap.cdap.api.data.DatasetContext; import io.cdap.cdap.api.data.batch.InputFormatProvider; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.WrappedException; import io.cdap.cdap.api.macro.MacroEvaluator; import io.cdap.cdap.api.spark.JavaSparkExecutionContext; import io.cdap.cdap.api.spark.JavaSparkMain; @@ -68,6 +70,7 @@ import org.apache.spark.sql.SQLContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import scala.Tuple2; import java.io.BufferedReader; @@ -149,15 +152,29 @@ protected SparkCollection> getSource(StageSpec stageSpec, FlatMapFunction, RecordInfo> sourceFunction = new BatchSourceFunction(pluginFunctionContext, functionCacheFactory.newCache()); this.functionCacheFactory = functionCacheFactory; - JavaRDD> rdd = sourceFactory - .createRDD(sec, jsc, stageSpec.getName(), Object.class, Object.class) - .flatMap(sourceFunction); - if (shouldForceDatasets) { - return OpaqueDatasetCollection.fromRdd( - rdd, sec, jsc, new SQLContext(jsc), datasetContext, sinkFactory, functionCacheFactory); + try { + JavaRDD> rdd = sourceFactory + .createRDD(sec, jsc, stageSpec.getName(), Object.class, Object.class) + .flatMap(sourceFunction); + if (shouldForceDatasets) { + return OpaqueDatasetCollection.fromRdd( + rdd, sec, jsc, new SQLContext(jsc), datasetContext, sinkFactory, functionCacheFactory); + } + return new RDDCollection<>(sec, functionCacheFactory, jsc, + new SQLContext(jsc), datasetContext, sinkFactory, rdd); + } catch (Exception e) { + List causalChain = Throwables.getCausalChain(e); + for(Throwable cause : causalChain) { + if (cause instanceof WrappedException) { + String stageName = ((WrappedException) cause).getStageName(); + LOG.error("Stage: {}", stageName); + MDC.put("Failed_Stage", stageName); + throw new WrappedException(e, stageName); + } + } + MDC.put("Failed_Stage", stageSpec.getName()); + throw new WrappedException(e, stageSpec.getName()); } - return new RDDCollection<>(sec, functionCacheFactory, jsc, - new SQLContext(jsc), datasetContext, sinkFactory, rdd); } @Override diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDCollection.java index b25c700b87dd..e235513186fe 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDCollection.java @@ -23,6 +23,7 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.DatasetManagementException; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.WrappedException; import io.cdap.cdap.api.spark.JavaSparkExecutionContext; import io.cdap.cdap.api.spark.sql.DataFrames; import io.cdap.cdap.etl.api.Alert; @@ -77,8 +78,10 @@ import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import scala.Tuple2; +import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; @@ -270,9 +273,23 @@ public Runnable createStoreTask(StageSpec stageSpec, PairFlatMapFunction sinkRDD = rdd.flatMapToPair(sinkFunction); - for (String outputName : sinkFactory.writeFromRDD(sinkRDD, sec, stageSpec.getName())) { - recordLineage(outputName); + try { + JavaPairRDD sinkRDD = rdd.flatMapToPair(sinkFunction); + for (String outputName : sinkFactory.writeFromRDD(sinkRDD, sec, stageSpec.getName())) { + recordLineage(outputName); + } + } catch (Exception e) { + List causalChain = Throwables.getCausalChain(e); + for(Throwable cause : causalChain) { + if (cause instanceof WrappedException) { + String stageName = ((WrappedException) cause).getStageName(); + LOG.error("Stage: {}", stageName); + throw new WrappedException(e, stageName); + } + } + MDC.put("Failed_Stage", stageSpec.getName()); + LOG.error("Stage: {}", stageSpec.getName()); + throw new WrappedException(e, stageSpec.getName()); } } }; diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDUtils.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDUtils.java index 74ce0ef98808..706b9befaa49 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDUtils.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDUtils.java @@ -16,12 +16,12 @@ package io.cdap.cdap.etl.spark.batch; -import com.google.common.base.Throwables; import io.cdap.cdap.api.data.batch.InputFormatProvider; import io.cdap.cdap.api.data.batch.OutputFormatProvider; +import io.cdap.cdap.api.exception.WrappedException; +import io.cdap.cdap.etl.batch.WrappedInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.security.UserGroupInformation; import org.apache.spark.api.java.JavaPairRDD; @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.Map; +import javax.annotation.Nullable; /** * Common RDD operations @@ -60,21 +61,22 @@ public static void saveHadoopDataset(JavaPairRDD rdd, Configuration @SuppressWarnings("unchecked") public static JavaPairRDD readUsingInputFormat(JavaSparkContext jsc, - InputFormatProvider inputFormatProvider, - ClassLoader classLoader, - Class keyClass, Class valueClass) { + InputFormatProvider inputFormatProvider, ClassLoader classLoader, + Class keyClass, Class valueClass, @Nullable String stageName) { Configuration hConf = new Configuration(); hConf.clear(); for (Map.Entry entry : inputFormatProvider.getInputFormatConfiguration().entrySet()) { hConf.set(entry.getKey(), entry.getValue()); } try { + // Instantiate the wrapper class, passing the necessary parameters @SuppressWarnings("unchecked") - Class inputFormatClass = (Class) classLoader.loadClass( - inputFormatProvider.getInputFormatClassName()); - return jsc.newAPIHadoopRDD(hConf, inputFormatClass, keyClass, valueClass); - } catch (ClassNotFoundException e) { - throw Throwables.propagate(e); + WrappedInputFormat wrappedInputFormat = new WrappedInputFormat<>(classLoader, + inputFormatProvider.getInputFormatClassName(), stageName); + + return jsc.newAPIHadoopRDD(hConf, wrappedInputFormat.getClass(), keyClass, valueClass); + } catch (IOException e) { + throw new WrappedException(e, stageName); } } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceFactory.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceFactory.java index e3b14bc18d59..f7bb386cb78b 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceFactory.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkBatchSourceFactory.java @@ -144,24 +144,27 @@ public JavaPairRDD createRDD(JavaSparkExecutionContext sec, JavaSpa } Iterator inputsIter = inputNames.iterator(); - JavaPairRDD inputRDD = createInputRDD(sec, jsc, inputsIter.next(), keyClass, valueClass); + JavaPairRDD inputRDD = createInputRDD(sec, jsc, inputsIter.next(), keyClass, valueClass, + sourceName); while (inputsIter.hasNext()) { - inputRDD = inputRDD.union(createInputRDD(sec, jsc, inputsIter.next(), keyClass, valueClass)); + inputRDD = inputRDD.union(createInputRDD(sec, jsc, inputsIter.next(), keyClass, valueClass, + sourceName)); } return inputRDD; } @SuppressWarnings("unchecked") - private JavaPairRDD createInputRDD(JavaSparkExecutionContext sec, JavaSparkContext jsc, String inputName, - Class keyClass, Class valueClass) { + private JavaPairRDD createInputRDD(JavaSparkExecutionContext sec, + JavaSparkContext jsc, String inputName, Class keyClass, + Class valueClass, String sourceName) { if (inputFormatProviders.containsKey(inputName)) { InputFormatProvider inputFormatProvider = inputFormatProviders.get(inputName); ClassLoader classLoader = Objects.firstNonNull(currentThread().getContextClassLoader(), - getClass().getClassLoader()); + getClass().getClassLoader()); return RDDUtils.readUsingInputFormat(jsc, inputFormatProvider, classLoader, keyClass, - valueClass); + valueClass, sourceName); } if (datasetInfos.containsKey(inputName)) {