Skip to content

Commit

Permalink
Error Management for File plugin Source/Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit-CloudSufi committed Jan 20, 2025
1 parent dd1f03b commit d25f116
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.batch.source.FileErrorDetailsProvider;
import io.cdap.plugin.format.plugin.AbstractFileSink;
import io.cdap.plugin.format.plugin.AbstractFileSinkConfig;

Expand All @@ -48,6 +49,11 @@ public FileSink(Conf config) {
this.config = config;
}

@Override
protected String getErrorDetailsProviderClassName() {
return FileErrorDetailsProvider.class.getName();
}

@Override
protected Map<String, String> getFileSystemProperties(BatchSinkContext context) {
return config.getFSProperties();
Expand Down Expand Up @@ -97,7 +103,8 @@ private Map<String, String> getFSProperties() {
try {
return GSON.fromJson(fileSystemProperties, MAP_TYPE);
} catch (JsonSyntaxException e) {
throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e);
throw new IllegalArgumentException(
String.format("Unable to parse filesystem properties: %s", e.getMessage()), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public FileBatchSource(FileSourceConfig config) {
this.config = config;
}

@Override
protected String getErrorDetailsProviderClassName() {
return FileErrorDetailsProvider.class.getName();
}

@Override
protected Map<String, String> getFileSystemProperties(BatchSourceContext context) {
Map<String, String> properties = new HashMap<>(config.getFileSystemProperties());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin.batch.source;

import io.cdap.plugin.common.HydratorErrorDetailsProvider;

/**
* FileErrorDetails provider
*/
public class FileErrorDetailsProvider extends HydratorErrorDetailsProvider {

}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Map<String, String> getFileSystemProperties() {
try {
return GSON.fromJson(fileSystemProperties, MAP_STRING_STRING_TYPE);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e);
throw new IllegalArgumentException(String.format("Unable to parse filesystem properties: %s", e.getMessage()), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin.common;

import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;

import java.util.List;
import javax.annotation.Nullable;

/**
* Error details provided for the Snowflake
**/
public class HydratorErrorDetailsProvider implements ErrorDetailsProvider {

private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s";

@Nullable
@Override
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
// if causal chain already has program failure exception, return null to avoid double wrap.
return null;
}
if (t instanceof IllegalArgumentException) {
return getProgramFailureException((IllegalArgumentException) t, errorContext, ErrorType.USER);
}
}
return null;
}

/**
* Get a ProgramFailureException with the given error
* information from {@link Exception}.
*
* @param exception The Exception to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(Exception exception, ErrorContext errorContext,
ErrorType errorType) {
String errorMessage = exception.getMessage();
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage,
String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), errorMessage), errorType, false, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.validation.FormatContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.LineageRecorder;
Expand Down Expand Up @@ -124,11 +125,16 @@ public void prepareRun(BatchSinkContext context) throws Exception {
Map<String, String> outputProperties = new HashMap<>(validatingOutputFormat.getOutputFormatConfiguration());
outputProperties.putAll(getFileSystemProperties(context));
outputProperties.put(FileOutputFormat.OUTDIR, getOutputDir(context));
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
context.addOutput(Output.of(config.getReferenceName(),
new SinkOutputFormatProvider(validatingOutputFormat.getOutputFormatClassName(),
outputProperties)));
}

protected String getErrorDetailsProviderClassName() {
return null;
}

protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException {
String fileFormat = config.getFormatName();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.validation.FormatContext;
import io.cdap.cdap.etl.api.validation.ValidatingInputFormat;
import io.cdap.plugin.common.LineageRecorder;
Expand Down Expand Up @@ -223,10 +223,14 @@ public void prepareRun(BatchSourceContext context) throws Exception {
for (Map.Entry<String, String> entry : getFileSystemProperties(context).entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}

context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
context.setInput(Input.of(config.getReferenceName(), new SourceInputFormatProvider(inputFormatClass, conf)));
}

protected String getErrorDetailsProviderClassName() {
return null;
}

protected ValidatingInputFormat getInputFormatForRun(BatchSourceContext context)
throws InstantiationException {
String fileFormat = config.getFormatName();
Expand Down

0 comments on commit d25f116

Please sign in to comment.