diff --git a/cdap-api-common/pom.xml b/cdap-api-common/pom.xml
index ada8e8f44227..59b7adede6e5 100644
--- a/cdap-api-common/pom.xml
+++ b/cdap-api-common/pom.xml
@@ -43,6 +43,12 @@
junit
junit
+
+ io.cdap.cdap
+ cdap-error-api
+ 6.11.0-SNAPSHOT
+ compile
+
diff --git a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedException.java b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedException.java
new file mode 100644
index 000000000000..e947cbf0ee2e
--- /dev/null
+++ b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedException.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.api.exception;
+
+import io.cdap.cdap.error.api.ErrorTagProvider;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Sets the stage name in exception.
+ */
+public class WrappedException extends RuntimeException implements ErrorTagProvider {
+
+ private final String stageName;
+ private final Set errorTags = new HashSet<>();
+
+ public WrappedException(Throwable cause, String stageName) {
+ super(cause);
+ this.stageName = stageName;
+ this.errorTags.add(ErrorTag.PLUGIN);
+ }
+
+ public WrappedException(String message, String stageName) {
+ super(message);
+ this.stageName = stageName;
+ this.errorTags.add(ErrorTag.PLUGIN);
+ }
+
+ public WrappedException(Throwable cause, String message, String stageName) {
+ super(message, cause);
+ this.stageName = stageName;
+ this.errorTags.add(ErrorTag.PLUGIN);
+ }
+
+ public String getStageName() {
+ return stageName;
+ }
+
+ @Override
+ public Set getErrorTags() {
+ return Collections.unmodifiableSet(errorTags);
+ }
+}
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowProgramController.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowProgramController.java
index bd2b44807ed7..1eaeadc3845b 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowProgramController.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowProgramController.java
@@ -15,15 +15,19 @@
*/
package io.cdap.cdap.internal.app.runtime.workflow;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Service;
+import io.cdap.cdap.api.exception.WrappedException;
import io.cdap.cdap.internal.app.runtime.AbstractProgramController;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.id.ProgramRunId;
+import java.util.List;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.ServiceListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
*
@@ -87,6 +91,15 @@ public void terminated(Service.State from) {
@Override
public void failed(Service.State from, Throwable failure) {
+ List causalChain = Throwables.getCausalChain(failure);
+ for(Throwable cause : causalChain) {
+ if (cause instanceof WrappedException) {
+ String stageName = ((WrappedException) cause).getStageName();
+ LOG.error("Stage: {}", stageName);
+ MDC.put("Failed_Stage", stageName);
+ break;
+ }
+ }
LOG.error("Workflow service '{}' failed.", serviceName, failure);
error(failure);
}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedInputFormat.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedInputFormat.java
new file mode 100644
index 000000000000..f368e865be57
--- /dev/null
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedInputFormat.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.etl.batch;
+
+import io.cdap.cdap.api.exception.WrappedException;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class WrappedInputFormat extends InputFormat {
+ private final InputFormat inputFormat;
+ private final String stageName;
+
+ @Override
+ public List getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ try {
+ return inputFormat.getSplits(jobContext);
+ } catch (Exception e) {
+ if (stageName != null) {
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ try {
+ return new WrappedRecordReader<>(inputFormat.createRecordReader(inputSplit,
+ taskAttemptContext), stageName);
+ } catch (Exception e) {
+ if (stageName != null) {
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Returns the delegating {@link InputFormat} based on the current configuration.
+ *
+ * @param classLoader the {@link ClassLoader} for loading input format
+ * @param inputFormatClassName the name of {@link InputFormat} class
+ * @throws IOException if failed to instantiate the input format class
+ */
+ public WrappedInputFormat(ClassLoader classLoader, String inputFormatClassName,
+ String stageName) throws IOException {
+ this.stageName = stageName;
+ if (inputFormatClassName == null) {
+ throw new IllegalArgumentException("Missing configuration for the InputFormat to use");
+ }
+ if (inputFormatClassName.equals(getClass().getName())) {
+ throw new IllegalArgumentException("Cannot delegate InputFormat to the same class");
+ }
+ try {
+ //noinspection unchecked
+ @SuppressWarnings("unchecked")
+ Class> inputFormatClass = (Class>) classLoader.loadClass(
+ inputFormatClassName);
+ this.inputFormat = inputFormatClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new IOException(
+ String.format("Unable to instantiate delegate input format %s", inputFormatClassName), e);
+ }
+ }
+}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedRecordReader.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedRecordReader.java
new file mode 100644
index 000000000000..36379afa8b73
--- /dev/null
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/batch/WrappedRecordReader.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.etl.batch;
+
+import io.cdap.cdap.api.exception.WrappedException;
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class WrappedRecordReader extends RecordReader {
+ private final RecordReader recordReader;
+ private final String stageName;
+
+ public WrappedRecordReader(RecordReader recordReader, String stageName) {
+ this.recordReader = recordReader;
+ this.stageName = stageName;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ try {
+ recordReader.initialize(inputSplit, taskAttemptContext);
+ } catch (Exception e) {
+ if (stageName != null) {
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ try {
+ return recordReader.nextKeyValue();
+ } catch (Exception e) {
+ if (stageName != null) {
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public K getCurrentKey() throws IOException, InterruptedException {
+ try {
+ return recordReader.getCurrentKey();
+ } catch (Exception e) {
+ if (stageName != null) {
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public V getCurrentValue() throws IOException, InterruptedException {
+ try {
+ return recordReader.getCurrentValue();
+ } catch (Exception e) {
+ if (stageName != null) {
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ try {
+ return recordReader.getProgress();
+ } catch (Exception e) {
+ if (stageName != null) {
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ recordReader.close();
+ } catch (Exception e) {
+ if (stageName != null) {
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
+ }
+}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/StageLoggingCaller.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/StageLoggingCaller.java
index 5c7861f276fb..1a460f2f25f6 100644
--- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/StageLoggingCaller.java
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/StageLoggingCaller.java
@@ -33,6 +33,10 @@ private StageLoggingCaller(Caller delegate, String stageName) {
this.stageName = stageName;
}
+ public String getStageName() {
+ return stageName;
+ }
+
@Override
public T call(Callable callable) throws Exception {
MDC.put(Constants.MDC_STAGE_KEY, stageName);
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedAction.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedAction.java
index 29d1a3335e0f..891a7443b9bb 100644
--- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedAction.java
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedAction.java
@@ -16,10 +16,14 @@
package io.cdap.cdap.etl.common.plugin;
+import io.cdap.cdap.api.exception.WrappedException;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import java.util.concurrent.Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
* Wrapper around {@link Action} that makes sure logging, classloading, and other pipeline
@@ -29,6 +33,7 @@ public class WrappedAction extends Action implements PluginWrapper {
private final Action action;
private final Caller caller;
+ private static final Logger LOG = LoggerFactory.getLogger(WrappedAction.class);
public WrappedAction(Action action, Caller caller) {
this.action = action;
@@ -45,10 +50,20 @@ public void configurePipeline(final PipelineConfigurer pipelineConfigurer) {
@Override
public void run(final ActionContext context) throws Exception {
- caller.call((Callable) () -> {
- action.run(context);
- return null;
- });
+ try {
+ caller.call((Callable) () -> {
+ action.run(context);
+ return null;
+ });
+ } catch(Exception e) {
+ if (caller instanceof StageLoggingCaller) {
+ String stageName = ((StageLoggingCaller) caller).getStageName();
+ MDC.put("Failed_Stage", stageName);
+ LOG.error("Stage: {}", stageName);
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
+ }
}
@Override
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSink.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSink.java
index 18e84fe419b7..dd0904796c64 100644
--- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSink.java
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSink.java
@@ -17,6 +17,7 @@
package io.cdap.cdap.etl.common.plugin;
import io.cdap.cdap.api.dataset.lib.KeyValue;
+import io.cdap.cdap.api.exception.WrappedException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
@@ -24,6 +25,9 @@
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import java.util.concurrent.Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
* Wrapper around {@link BatchSource} that makes sure logging, classloading, and other pipeline
@@ -40,6 +44,7 @@ public class WrappedBatchSink
private final BatchSink batchSink;
private final Caller caller;
private final OperationTimer operationTimer;
+ private static final Logger LOG = LoggerFactory.getLogger(WrappedBatchSink.class);
public WrappedBatchSink(BatchSink batchSink, Caller caller,
OperationTimer operationTimer) {
@@ -73,6 +78,14 @@ public void transform(IN input,
batchSink.transform(input, new UntimedEmitter<>(emitter, operationTimer));
return null;
});
+ } catch(Exception e) {
+ if (caller instanceof StageLoggingCaller) {
+ String stageName = ((StageLoggingCaller) caller).getStageName();
+ MDC.put("Failed_Stage", stageName);
+ LOG.error("Stage: {}", stageName);
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
} finally {
operationTimer.reset();
}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSource.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSource.java
index b2129a24c93f..0d1231934e41 100644
--- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSource.java
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedBatchSource.java
@@ -17,12 +17,16 @@
package io.cdap.cdap.etl.common.plugin;
import io.cdap.cdap.api.dataset.lib.KeyValue;
+import io.cdap.cdap.api.exception.WrappedException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import java.util.concurrent.Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
* Wrapper around {@link BatchSource} that makes sure logging, classloading, and other pipeline
@@ -39,6 +43,7 @@ public class WrappedBatchSource
private final BatchSource batchSource;
private final Caller caller;
private final OperationTimer operationTimer;
+ private static final Logger LOG = LoggerFactory.getLogger(WrappedBatchSource.class);
public WrappedBatchSource(BatchSource batchSource,
Caller caller, OperationTimer operationTimer) {
@@ -71,6 +76,14 @@ public void transform(KeyValue input, Emitter emitter) thro
batchSource.transform(input, new UntimedEmitter<>(emitter, operationTimer));
return null;
});
+ } catch(Exception e) {
+ if (caller instanceof StageLoggingCaller) {
+ String stageName = ((StageLoggingCaller) caller).getStageName();
+ MDC.put("Failed_Stage", stageName);
+ LOG.error("Stage: {}", stageName);
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
} finally {
operationTimer.reset();
}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedTransform.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedTransform.java
index ef7408d0db3c..e28ddb6f14a2 100644
--- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedTransform.java
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/WrappedTransform.java
@@ -16,12 +16,16 @@
package io.cdap.cdap.etl.common.plugin;
+import io.cdap.cdap.api.exception.WrappedException;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageSubmitterContext;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import java.util.concurrent.Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
* Wrapper around a {@link Transform} that makes sure logging, classloading, and other pipeline
@@ -36,6 +40,7 @@ public class WrappedTransform extends Transform implements
private final Transform transform;
private final Caller caller;
private final OperationTimer operationTimer;
+ private static final Logger LOG = LoggerFactory.getLogger(WrappedTransform.class);
public WrappedTransform(Transform transform, Caller caller,
OperationTimer operationTimer) {
@@ -92,6 +97,14 @@ public void transform(IN input, Emitter emitter) throws Exception {
transform.transform(input, new UntimedEmitter<>(emitter, operationTimer));
return null;
});
+ } catch(Exception e) {
+ if (caller instanceof StageLoggingCaller) {
+ String stageName = ((StageLoggingCaller) caller).getStageName();
+ MDC.put("Failed_Stage", stageName);
+ LOG.error("Stage: {}", stageName);
+ throw new WrappedException(e, stageName);
+ }
+ throw e;
} finally {
operationTimer.reset();
}
diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/validation/LoggingFailureCollector.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/validation/LoggingFailureCollector.java
index 72b624ff1034..2d482f1c23b2 100644
--- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/validation/LoggingFailureCollector.java
+++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/validation/LoggingFailureCollector.java
@@ -25,6 +25,7 @@
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
* Failure collector that logs the failures.
@@ -32,6 +33,7 @@
public class LoggingFailureCollector extends DefaultFailureCollector {
private static final Logger LOG = LoggerFactory.getLogger(LoggingFailureCollector.class);
+ private String stageName;
/**
* Failure collector that logs the failures.
@@ -41,6 +43,7 @@ public class LoggingFailureCollector extends DefaultFailureCollector {
*/
public LoggingFailureCollector(String stageName, Map inputSchemas) {
super(stageName, inputSchemas);
+ this.stageName = stageName;
}
@Override
@@ -57,7 +60,9 @@ public ValidationException getOrThrowException() throws ValidationException {
}
List failures = validationException.getFailures();
- LOG.error("Encountered '{}' validation failures: {}{}", failures.size(), System.lineSeparator(),
+ MDC.put("Failed_Stage", stageName);
+ LOG.error("Encountered '{}' validation failures in stage {}: {}{}", stageName, failures.size(),
+ System.lineSeparator(),
IntStream.range(0, failures.size())
.mapToObj(
index -> String.format("%d. %s", index + 1, failures.get(index).getFullMessage()))
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSQLEngineAdapter.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSQLEngineAdapter.java
index 6349970d4f79..abab027eba00 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSQLEngineAdapter.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSQLEngineAdapter.java
@@ -347,7 +347,7 @@ private BatchCollectionFactory pullInternal(SQLDataset dataset) throws SQ
ClassLoader classLoader = Objects.firstNonNull(Thread.currentThread().getContextClassLoader(),
getClass().getClassLoader());
JavaPairRDD pairRDD = RDDUtils.readUsingInputFormat(jsc, sqlPullDataset, classLoader, Object.class,
- Object.class);
+ Object.class, null);
JavaRDD rdd = pairRDD.flatMap(new TransformFromPairFunction(sqlPullDataset.fromKeyValue()))
.map(f -> {
return f;
diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java
index f3cc3e25e797..a7441344dd1b 100644
--- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java
+++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java
@@ -17,6 +17,7 @@
package io.cdap.cdap.etl.spark.batch;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
@@ -26,6 +27,7 @@
import io.cdap.cdap.api.data.DatasetContext;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.exception.WrappedException;
import io.cdap.cdap.api.macro.MacroEvaluator;
import io.cdap.cdap.api.spark.JavaSparkExecutionContext;
import io.cdap.cdap.api.spark.JavaSparkMain;
@@ -68,6 +70,7 @@
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import scala.Tuple2;
import java.io.BufferedReader;
@@ -149,15 +152,29 @@ protected SparkCollection> getSource(StageSpec stageSpec,
FlatMapFunction, RecordInfo