diff --git a/.github/workflows/jacoco_check.yml b/.github/workflows/jacoco_check.yml index a47cb934..2f232b38 100644 --- a/.github/workflows/jacoco_check.yml +++ b/.github/workflows/jacoco_check.yml @@ -1,5 +1,5 @@ # -# Copyright 2023 ABSA Group Limited +# Copyright 2021 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e15fbddb..2f418b92 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,5 +1,5 @@ # -# Copyright 2022 ABSA Group Limited +# Copyright 2021 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/README.md b/README.md index d3dda351..88d988cb 100644 --- a/README.md +++ b/README.md @@ -411,7 +411,24 @@ path even of nested fields. It also evaluates arrays and maps where the array in def col_of_path(fullColName: String): Column ``` +2. Provides a column of NULL values. + + ```scala + def nul_coll(): Column + ``` + + +3. Provides a column of NULL values, but the actual type is per specification + + ```scala + def nul_coll(dataType: DataType): Column + ``` +## Error Handling + +A `trait` and a set of supporting classes and other traits to enable errrors channeling between libraries and +application during Spark data processing. + ## Spark Commons Test ### Usage: diff --git a/build.sbt b/build.sbt index fe8cc107..45ca617b 100644 --- a/build.sbt +++ b/build.sbt @@ -36,6 +36,15 @@ lazy val commonSettings = Seq( Test / parallelExecution := false ) +/** + * add "za.co.absa.spark.commons.utils.ExplodeTools" to filter a class + * or "za.co.absa.spark.commons.utils.JsonUtils*" to filter the class and all related objects + */ +lazy val commonJacocoExcludes: Seq[String] = Seq( + "za.co.absa.spark.commons.adapters.CallUdfAdapter", + "za.co.absa.spark.commons.adapters.TransformAdapter" +) + lazy val parent = (project in file(".")) .aggregate(sparkCommons.projectRefs ++ sparkCommonsTest.projectRefs: _*) .settings( diff --git a/project/JacocoSetup.scala b/project/JacocoSetup.scala index 32c4da63..626ea68f 100644 --- a/project/JacocoSetup.scala +++ b/project/JacocoSetup.scala @@ -1,5 +1,5 @@ /* - * Copyright 2023 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/OncePerSparkSession.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/OncePerSparkSession.scala index dcec715d..dc1f9e4d 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/OncePerSparkSession.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/OncePerSparkSession.scala @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap * instantiated classes thus not running the method [[register]] again on them. * * Usage: extend this abstract class and implement the method [[register]]. On initialization the - * [[register]] method gets called by the [[za.co.absa.spark.commons.OncePerSparkSession$.registerMe]] method if the class + spark session + * [[register]] method gets called by the [[za.co.absa.spark.commons.OncePerSparkSession$.registerMe OncePerSparkSession.registerMe()]] method if the class + spark session * combination is unique. If it is not unique [[register]] will not get called again. * This way we ensure only single registration per spark session. * diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorHandling.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorHandling.scala index 4acd99c2..8e512e37 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorHandling.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorHandling.scala @@ -16,29 +16,145 @@ package za.co.absa.spark.commons.errorhandling +import org.apache.spark.sql.catalyst.expressions.{CaseWhen, Expression} +import org.apache.spark.sql.functions.when import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{Column, DataFrame} import za.co.absa.spark.commons.errorhandling.implementations.submits.{ErrorMessageSubmitOnColumn, ErrorMessageSubmitWithoutColumn} import za.co.absa.spark.commons.errorhandling.types._ +/** + * The basic class of error handling component. Every library that wants to use the component during Spark data + * processing should utilize this trait and its methods. The methods serve to record the errors and attach them to the + * [[org.apache.spark.sql.DataFrame spark.DataFrame]]. The trait should be an input parameter for such library, perhaps as an implicit. + * On the other side the end application provides concrete [[ErrorHandling]] implementation, that does the actual error + * handling by the application desire. + * For easy to use and as examples, a few general implementations are provided in the implementations sub-folder. + * Also for common, repeated implementations the folder `partials` offer some traits. + */ trait ErrorHandling { - def putErrorToColumn(errorMessageSubmit: ErrorMessageSubmit): ErrorColumn + /** + * First of the few methods that needs to be coded in the trait implementation + * The purpose of this method is to convert the error specification into a [[org.apache.spark.sql.Column spark.Column]] expression + * @param errorMessageSubmit - the error specification + * @return - the error specification transformed into a column expression + * @group Error Handling + * @since 0.6.0 + */ + protected def transformErrorSubmitToColumn(errorMessageSubmit: ErrorMessageSubmit): Column - def aggregateErrorColumns(dataFrame: DataFrame)(errCols: ErrorColumn*): DataFrame + /** + * Applies the provided columns to the incoming [[org.apache.spark.sql.DataFrame spark.DataFrame]]. Usually they might be aggregated in some way and attached + * to the [[org.apache.spark.sql.DataFrame spark.DataFrame]], but any other operations are imaginable. Unless really bent, the incoming columns are those + * produced by [[transformErrorSubmitToColumn]]. + * The idea here is that the error column contains information of the error that occurred on the row or is empty (NULL) + * otherwise. + * In each implementation calling the function to each column separately or in any grouping of columns should produce + * the same result (with the exception of order of errors in the aggregation). + * @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to apply the error columns to + * @param errCols - the list of error columns to apply + * @return - data frame with the error columns applied (aggregated and attached or done otherwise) + */ + protected def doApplyErrorColumnsToDataFrame(dataFrame: DataFrame, errCols: Column*): DataFrame + /** + * The idea of this function is: "Put the error specified to the provided dataframe if the condition is true on the row." + * The error is transformed to a column using the [[transformErrorSubmitToColumn]] method and applied to the data frame + * if the "when" condition is true using the [[doApplyErrorColumnsToDataFrame]] method. + * @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to operate on + * @param when - the condition that defines the error occurred on the row + * @param errorMessageSubmit - the detected error specification + * @return - the original [[org.apache.spark.sql.DataFrame spark.DataFrame]] with the error detection applied + * @group Error Handling + * @since 0.6.0 + */ def putError(dataFrame: DataFrame)(when: Column)(errorMessageSubmit: ErrorMessageSubmit): DataFrame = { putErrorsWithGrouping(dataFrame)(Seq(ErrorWhen(when, errorMessageSubmit))) } - def putErrorsWithGrouping(dataFrame: DataFrame)(errorsWhen: Seq[ErrorWhen]): DataFrame - def putErrorToColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errCol: Option[ErrSourceColName], additionalInfo: AdditionalInfo = None): ErrorColumn = { - val toSubmit = errCol + /** + * Same as [[putError]], but allows a series of pairs condition-error to be specified at once. + * It should be noted, that once an error has been identified for a field on the row, no more conditions bound to that + * field are evaluated. + * @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to operate on + * @param errorsWhen - the list of condition-error pairs, the condition are grouped by the field of the error submissions + * @return - the original data frame with the error detection applied + * @group Error Handling + * @since 0.6.0 + */ + def putErrorsWithGrouping(dataFrame: DataFrame)(errorsWhen: Seq[ErrorWhen]): DataFrame = { + def errorWhenToCol(errorWhen: ErrorWhen): Column = { + when(errorWhen.when, transformErrorSubmitToColumn(errorWhen.errorMessageSubmit)) + } + def errorWhenSeqToCol(errorsWhen: Seq[ErrorWhen]): Column = { + val branches: Seq[(Expression, Expression)] = errorsWhen.map(errorWhen => (errorWhen.when.expr, transformErrorSubmitToColumn(errorWhen.errorMessageSubmit).expr)) + new Column(CaseWhen(branches)) + } + + val errorsByColumn = errorsWhen.groupBy(_.errorMessageSubmit.errColsAndValues.columnNames) + val noColNames = Set.empty[String] + val errorColumns1 = errorsByColumn.getOrElse(noColNames, Seq.empty).map(errorWhenToCol) // no grouping without ErrCol names + val errorColumns2 = (errorsByColumn - noColNames).values.map(errorWhenSeqToCol).toSeq + doApplyErrorColumnsToDataFrame(dataFrame, errorColumns1 ++ errorColumns2: _*) + } + + /** + * Transforms an error information into a column expression. For cases when simple column expression condition used in + * [[putError]] is not suitable for whatever reason. + * The returned [[types.ErrorColumn]] should then be used in [[applyErrorColumnsToDataFrame]]. + * @param errorMessageSubmit - the error specification + * @return - [[types.ErrorColumn]] expression containing the error specification + * @group Error Handling + * @since 0.6.0 + */ + def createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit): ErrorColumn = { + ErrorColumn(transformErrorSubmitToColumn(errorMessageSubmit)) + } + + /** + * Same as the other [[ErrorHandling!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandling\.ErrorMessageSubmit)* createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit)]], only providing the error specification + * in decomposed state, not in the [[ErrorMessageSubmit]] trait form. + * @param errType - word description of the type of the error + * @param errCode - number designation of the type of the error + * @param errMessage - human friendly description of the error + * @param errSourceColName - the name of the column the error happened at + * @param additionalInfo - any optional additional info in JSON format + * @return - [[types.ErrorColumn]] expression containing the error specification + * @group Error Handling + * @since 0.6.0 + */ + def createErrorAsColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errSourceColName: Option[ErrSourceColName], additionalInfo: AdditionalInfo = None): ErrorColumn = { + val toSubmit = errSourceColName .map(errSourceColName => ErrorMessageSubmitOnColumn(errType, errCode, errMessage, errSourceColName, additionalInfo)) .getOrElse(ErrorMessageSubmitWithoutColumn(errType, errCode, errMessage, additionalInfo)) - putErrorToColumn(toSubmit) + createErrorAsColumn(toSubmit) + } + + + /** + * Applies the earlier collected [[types.ErrorColumn ErrorColumns]] to the provided [[org.apache.spark.sql.DataFrame spark.DataFrame]]. + * See [[doApplyErrorColumnsToDataFrame]] for detailed functional explanation. + * @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to operate on + * @param errCols - a list of [[types.ErrorColumn]] returned by previous calls of [[ErrorHandling!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandling\.ErrorMessageSubmit)* createErrorAsColumn]] + * @return - the original data frame with the error detection applied + * @group Error Handling + * @since 0.6.0 + */ + def applyErrorColumnsToDataFrame(dataFrame: DataFrame)(errCols: ErrorColumn*): DataFrame = { + doApplyErrorColumnsToDataFrame(dataFrame, errCols.map(_.column): _*) } + /** + * Provides the library some information about how the actual implementation of [[ErrorHandling]] is structured. + * This function provides the information on the structure of single error column + * @return - the DataType of the column returned from `createErrorAsColumn` function + */ def errorColumnType: DataType - def errorColumnAggregationType: Option[DataType] + /** + * Provides the library some information about how the actual implementation of [[ErrorHandling]] is structured. + * This function describes what is the type of the column attached (if it didn't exists before) to the [[org.apache.spark.sql.DataFrame DataFrame]] + * @return - the DataType of the column containing the error info that is attached to the [[org.apache.spark.sql.DataFrame DataFrame]]. + */ + def dataFrameColumnType: Option[DataType] } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorMessageSubmit.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorMessageSubmit.scala index f4ffa6c9..84e4a73a 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorMessageSubmit.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/ErrorMessageSubmit.scala @@ -18,10 +18,15 @@ package za.co.absa.spark.commons.errorhandling import za.co.absa.spark.commons.errorhandling.types._ +/** + * Trait collecting error definition in a format usable during Spark data processing + * @group Error Handling + * @since 0.6.0 + */ trait ErrorMessageSubmit { def errType: ColumnOrValue[ErrType] def errCode: ColumnOrValue[ErrCode] - def errMsg: ColumnOrValue[ErrMsg] + def errMessage: ColumnOrValue[ErrMsg] def errColsAndValues: ColumnOrValue[ErrColsAndValues] - def additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty + def additionalInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrors.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrors.scala index 11406e3e..4b7cf998 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrors.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrors.scala @@ -19,20 +19,19 @@ package za.co.absa.spark.commons.errorhandling.implementations import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.functions.{coalesce, lit} import org.apache.spark.sql.types.{BooleanType, DataType} -import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit -import za.co.absa.spark.commons.errorhandling.partials.ErrorHandlingCommon +import za.co.absa.spark.commons.errorhandling.{ErrorHandling, ErrorMessageSubmit} /** * Class implements the functionality of filtering rows that have some error (any of the error columns is not NULL). */ -object ErrorHandlingFilterRowsWithErrors extends ErrorHandlingCommon { +object ErrorHandlingFilterRowsWithErrors extends ErrorHandling { /** * Creates a column with the error description, in this particular case actually only signals with a boolean flag there was an error in the row. * @param errorMessageSubmit - the description of the error * @return - A column with boolean value indicating there was an error on the row. */ - override protected def evaluate(errorMessageSubmit: ErrorMessageSubmit): Column = { + override protected def transformErrorSubmitToColumn(errorMessageSubmit: ErrorMessageSubmit): Column = { lit(true) } @@ -42,18 +41,22 @@ object ErrorHandlingFilterRowsWithErrors extends ErrorHandlingCommon { * @param errCols - the error columns to signal if the row should be filtered or not * @return - returns the dataframe without rows with errors */ - override protected def doTheColumnsAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = { + override protected def doApplyErrorColumnsToDataFrame(dataFrame: DataFrame, errCols: Column*): DataFrame = { val columns: Seq[Column] = errCols :+ lit(false) dataFrame.filter(!coalesce(columns: _*)) } /** - * @return BooleanType of Datatype object + * Provides the library some information about how the actual implementation of [[ErrorHandling]] is structured. + * This function provides the information on the structure of single error column + * @return - `BooleanType`, as all what is needed is a true flag if error was present */ val errorColumnType: DataType = BooleanType /** - * @return None since no error-related column added during the aggregation + * Provides the library some information about how the actual implementation of [[ErrorHandling]] is structured. + * This function describes what is the type of the column attached (if it didn't exists before) to the [[org.apache.spark.sql.DataFrame DataFrame]] + * @return - `None` since no error-related column is added */ - val errorColumnAggregationType: Option[DataType] = None + val dataFrameColumnType: Option[DataType] = None } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorMessageArray.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorMessageArray.scala index b590104b..4b52f56c 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorMessageArray.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorMessageArray.scala @@ -20,14 +20,21 @@ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.functions.{array, array_except, array_union, col, map_from_arrays, map_keys, map_values, struct, when} import org.apache.spark.sql.types.{ArrayType, DataType} import za.co.absa.spark.commons.adapters.TransformAdapter -import za.co.absa.spark.commons.errorhandling.partials.EvaluateIntoErrorMessage.FieldNames._ -import za.co.absa.spark.commons.errorhandling.partials.{ErrorHandlingCommon, EvaluateIntoErrorMessage} +import za.co.absa.spark.commons.errorhandling.ErrorHandling +import za.co.absa.spark.commons.errorhandling.partials.TransformIntoErrorMessage.FieldNames._ +import za.co.absa.spark.commons.errorhandling.partials.TransformIntoErrorMessage import za.co.absa.spark.commons.sql.functions.null_col import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements +/** + * An implementation of [[ErrorHandling]] that collects errors into columns of struct based on [[za.co.absa.spark.commons.errorhandling.ErrorMessage ErrorMessage]] case class. + * Upon applying the non-NULL columns are aggregated into an array column which is attached to the [[org.apache.spark.sql.DataFrame spark.DataFrame]]. + * In case the column already exists in the DataFrame, the columns are appended to the column. + * @param errorColumnName - the name of the array column aggregating all the errors + */ case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.defaultErrorColumnName) - extends ErrorHandlingCommon - with EvaluateIntoErrorMessage + extends ErrorHandling + with TransformIntoErrorMessage with TransformAdapter { private def decomposeMap(errorMessageColumn: Column): Column = { @@ -36,7 +43,7 @@ case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.default errorMessageColumn.getField(errType) as errType, errorMessageColumn.getField(errCode) as errCode, errorMessageColumn.getField(errMsg) as errMsg, - map_keys(errorMessageColumn.getField(errColsAndValues)) as errCols, + map_keys(errorMessageColumn.getField(errColsAndValues)) as errSourceCols, map_values(errorMessageColumn.getField(errColsAndValues)) as errValues, errorMessageColumn.getField(additionInfo) as additionInfo ) @@ -48,7 +55,7 @@ case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.default errorMessageColumn.getField(errType) as errType, errorMessageColumn.getField(errCode) as errCode, errorMessageColumn.getField(errMsg) as errMsg, - map_from_arrays(errorMessageColumn.getField(errCols), errorMessageColumn.getField(errValues)) as errColsAndValues, + map_from_arrays(errorMessageColumn.getField(errSourceCols), errorMessageColumn.getField(errValues)) as errColsAndValues, errorMessageColumn.getField(additionInfo) as additionInfo ) } @@ -60,7 +67,7 @@ case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.default dataFrame.withColumn(errorColName, reMap(array_union(deMap(col(errorColName)), colToUnion))) } - protected def doTheColumnsAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = { + protected def doApplyErrorColumnsToDataFrame(dataFrame: DataFrame, errCols: Column*): DataFrame = { val aggregated = array(errCols.map(decomposeMap): _*) //need to decompose the map field, as it's not supported in array functions val aggregatedWithoutNulls = array_except(aggregated, array(null_col)) val joinToExisting: (DataFrame, String) => DataFrame = appendToErrCol(_, _, aggregatedWithoutNulls) @@ -68,10 +75,11 @@ case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.default } /** - * errorColumnAggregationType returns an array of errorColumnType - * @return option type of ArrayType for added column + * Provides the library some information about how the actual implementation of [[ErrorHandling]] is structured. + * This function describes what is the type of the column attached (if it didn't exists before) to the [[org.apache.spark.sql.DataFrame DataFrame]] + * @return - the aggregated [[za.co.absa.spark.commons.errorhandling.ErrorHandling.errorColumnType ErrorHandling.errorColumnType]] into an ArrayType */ - override def errorColumnAggregationType: Option[DataType] = { + override def dataFrameColumnType: Option[DataType] = { Option(ArrayType(errorColumnType, containsNull = false)) } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitJustErrorValue.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitJustErrorValue.scala index b71f9078..3ac4d3ee 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitJustErrorValue.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitJustErrorValue.scala @@ -22,12 +22,23 @@ import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit import za.co.absa.spark.commons.errorhandling.implementations.submits.ErrorMessageSubmitJustErrorValue.noColumnKey import za.co.absa.spark.commons.errorhandling.types._ +/** + * [[za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit ErrorMessageSubmit]] subclass to represent an error not + * bound to a particular column but still having a value that caused the error. + * @param errType - error type + * @param errCode - error code + * @param errMessage - error message + * @param errValue - the value that caused the error + * @param additionalInfo - optional additional info in form of JSON + * @group Error Handling + * @since 0.6.0 + */ class ErrorMessageSubmitJustErrorValue( val errType: ColumnOrValue[ErrType], val errCode: ColumnOrValue[ErrCode], - val errMsg: ColumnOrValue[ErrMsg], + val errMessage: ColumnOrValue[ErrMsg], errValue: ColumnOrValue[ErrValue], - override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty + override val additionalInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty ) extends ErrorMessageSubmit { val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(map(lit(noColumnKey), errValue.column.cast(StringType))) } @@ -35,7 +46,18 @@ class ErrorMessageSubmitJustErrorValue( object ErrorMessageSubmitJustErrorValue { val noColumnKey: ErrSourceColName = "" - def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: ErrValue, additionalInfo: AdditionalInfo = None): ErrorMessageSubmitJustErrorValue = { + /** + * Convenient apply function + * @param errType - error type + * @param errCode - error code + * @param errMessage - error message + * @param errValue - the value that caused the error + * @param additionalInfo - optional additional info in form of JSON + * @return - instance of [[ErrorMessageSubmitJustErrorValue]] + * @group Error Handling + * @since 0.6.0 + */ + def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: String, additionalInfo: AdditionalInfo = None): ErrorMessageSubmitJustErrorValue = { new ErrorMessageSubmitJustErrorValue( ColumnOrValue.withValue(errType), ColumnOrValue.withValue(errCode), diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnColumn.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnColumn.scala index 4d7ebe6e..75bd8c38 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnColumn.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnColumn.scala @@ -18,23 +18,46 @@ package za.co.absa.spark.commons.errorhandling.implementations.submits import za.co.absa.spark.commons.errorhandling.types._ +/** + * [[za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit ErrorMessageSubmit]] subclass to represent an error bound to exactly one + * column. + * @param errType - error type + * @param errCode - error code + * @param errMessage - error message + * @param errSourceColName - the name of the column the error was detected on + * @param additionalInfo - optional additional info in form of JSON + * @group Error Handling + * @since 0.6.0 + */ class ErrorMessageSubmitOnColumn ( errType: ColumnOrValue[ErrType], errCode: ColumnOrValue[ErrCode], - errMsg: ColumnOrValue[ErrMsg], - errColName: ErrSourceColName, - override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty - ) extends ErrorMessageSubmitOnMoreColumns(errType, errCode, errMsg, Set(errColName), additionInfo) { + errMessage: ColumnOrValue[ErrMsg], + errSourceColName: ErrSourceColName, + override val additionalInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty + ) extends ErrorMessageSubmitOnMoreColumns(errType, errCode, errMessage, Set(errSourceColName), additionalInfo) { + } object ErrorMessageSubmitOnColumn { - def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errColName: ErrSourceColName, additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnColumn = { + /** + * Convenient apply function + * @param errType - error type + * @param errCode - error code + * @param errMessage - error message + * @param errSourceColName - the name of the column the error was detected on + * @param additionalInfo - optional additional info in form of JSON + * @return - instance of [[ErrorMessageSubmitOnColumn]] + * @group Error Handling + * @since 0.6.0 + */ + def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errSourceColName: ErrSourceColName, additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnColumn = { new ErrorMessageSubmitOnColumn( ColumnOrValue.withValue(errType), ColumnOrValue.withValue(errCode), ColumnOrValue.withValue(errMessage), - errColName, + errSourceColName, ColumnOrValue.withOption(additionalInfo) ) } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnMoreColumns.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnMoreColumns.scala index d3ac19d6..3fcaf563 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnMoreColumns.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnMoreColumns.scala @@ -19,27 +19,49 @@ package za.co.absa.spark.commons.errorhandling.implementations.submits import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit import za.co.absa.spark.commons.errorhandling.types._ +/** + * [[za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit ErrorMessageSubmit]] subclass to represent an error bound + * to multiple columns. + * @param errType - error type + * @param errCode - error code + * @param errMessage - error message + * @param errSourceColNames - the name of the columns the error was detected on + * @param additionalInfo - optional additional info in form of JSON + * @group Error Handling + * @since 0.6.0 + */ class ErrorMessageSubmitOnMoreColumns( val errType: ColumnOrValue[ErrType], val errCode: ColumnOrValue[ErrCode], - val errMsg: ColumnOrValue[ErrMsg], - errColNames: Set[ErrSourceColName], - override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty + val errMessage: ColumnOrValue[ErrMsg], + errSourceColNames: Set[ErrSourceColName], + override val additionalInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty ) extends ErrorMessageSubmit { - val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue.asMapOfStringColumns(errColNames) + val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue.asMapOfStringColumns(errSourceColNames) } object ErrorMessageSubmitOnMoreColumns { + /** + * Convenient apply function + * @param errType - error type + * @param errCode - error code + * @param errMessage - error message + * @param errSourceColNames - the name of the columns the error was detected on + * @param additionalInfo - optional additional info in form of JSON + * @return - instance of [[ErrorMessageSubmitOnMoreColumns]] + * @group Error Handling + * @since 0.6.0 + */ def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, - errColNames: Set[ErrSourceColName], + errSourceColNames: Set[ErrSourceColName], additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnMoreColumns = { new ErrorMessageSubmitOnMoreColumns( ColumnOrValue.withValue(errType), ColumnOrValue.withValue(errCode), ColumnOrValue.withValue(errMessage), - errColNames, + errSourceColNames, ColumnOrValue.withOption(additionalInfo) ) } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitWithoutColumn.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitWithoutColumn.scala index a9322645..aa9bb6c4 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitWithoutColumn.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitWithoutColumn.scala @@ -19,14 +19,23 @@ package za.co.absa.spark.commons.errorhandling.implementations.submits import org.apache.spark.sql.Column import org.apache.spark.sql.functions.typedLit import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit -import za.co.absa.spark.commons.errorhandling.implementations.submits.ErrorMessageSubmitWithoutColumn.emptyErrorColsAndValues import za.co.absa.spark.commons.errorhandling.types._ +/** + * [[za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit ErrorMessageSubmit]] subclass to represent an error not + * bound to any particular column. + * @param errType - error type + * @param errCode - error code + * @param errMessage - error message + * @param additionalInfo - optional additional info in form of JSON + * @group Error Handling + * @since 0.6.0 + */ class ErrorMessageSubmitWithoutColumn( val errType: ColumnOrValue[ErrType], val errCode: ColumnOrValue[ErrCode], - val errMsg: ColumnOrValue[ErrMsg], - override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty + val errMessage: ColumnOrValue[ErrMsg], + override val additionalInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty ) extends ErrorMessageSubmit { val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(ErrorMessageSubmitWithoutColumn.emptyErrColsAndValues) @@ -36,6 +45,16 @@ object ErrorMessageSubmitWithoutColumn { private val emptyErrorColsAndValues: ErrColsAndValues = Map.empty val emptyErrColsAndValues: Column = typedLit(emptyErrorColsAndValues) + /** + * Convenient apply function + * @param errType - error type + * @param errCode - error code + * @param errMessage - error message + * @param additionalInfo - optional additional info in form of JSON + * @return - instance of [[ErrorMessageSubmitWithoutColumn]] + * @group Error Handling + * @since 0.6.0 + */ def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, additionalInfo: AdditionalInfo = None): ErrorMessageSubmitWithoutColumn = { new ErrorMessageSubmitWithoutColumn( ColumnOrValue.withValue(errType), diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/ErrorHandlingCommon.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/ErrorHandlingCommon.scala deleted file mode 100644 index 56195860..00000000 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/ErrorHandlingCommon.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.spark.commons.errorhandling.partials - -import org.apache.spark.sql.catalyst.expressions.{CaseWhen, Expression} -import org.apache.spark.sql.{Column, DataFrame} -import za.co.absa.spark.commons.errorhandling.{ErrorHandling, ErrorMessageSubmit} -import za.co.absa.spark.commons.errorhandling.types._ -import org.apache.spark.sql.functions.when -import org.apache.spark.sql.types.DataType - -trait ErrorHandlingCommon extends ErrorHandling { - protected def evaluate(errorMessageSubmit: ErrorMessageSubmit): Column - - protected def doTheColumnsAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame - - def putErrorToColumn(errorMessageSubmit: ErrorMessageSubmit): ErrorColumn = { - ErrorColumn(evaluate(errorMessageSubmit)) - } - - def aggregateErrorColumns(dataFrame: DataFrame)(errCols: ErrorColumn*): DataFrame = { - doTheColumnsAggregation(dataFrame, errCols.map(_.column): _*) - } - - def putErrorsWithGrouping(dataFrame: DataFrame)(errorsWhen: Seq[ErrorWhen]): DataFrame = { - val errorsByColumn = errorsWhen.groupBy(_.errorMessageSubmit.errColsAndValues.columnNames) - val noColNames = Set.empty[String] - val errorColumns1 = errorsByColumn.getOrElse(noColNames, Seq.empty).map(errorWhenToCol) // no grouping without ErrCol names - val errorColumns2 = (errorsByColumn - noColNames).values.map(errorWhenSeqToCol).toSeq - doTheColumnsAggregation(dataFrame, errorColumns1 ++ errorColumns2: _*) - } - - - private def errorWhenToCol(errorWhen: ErrorWhen): Column = { - when(errorWhen.when, evaluate(errorWhen.errorMessageSubmit)) - } - - private def errorWhenSeqToCol(errorsWhen: Seq[ErrorWhen]): Column = { - val branches: Seq[(Expression, Expression)] = errorsWhen.map(errorWhen => (errorWhen.when.expr, evaluate(errorWhen.errorMessageSubmit).expr)) - new Column(CaseWhen(branches)) - } - -} diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/EvaluateIntoErrorMessage.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/TransformIntoErrorMessage.scala similarity index 67% rename from spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/EvaluateIntoErrorMessage.scala rename to spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/TransformIntoErrorMessage.scala index e3a44f3d..3b2e6314 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/EvaluateIntoErrorMessage.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/TransformIntoErrorMessage.scala @@ -20,21 +20,27 @@ import org.apache.spark.sql.Column import org.apache.spark.sql.functions.struct import org.apache.spark.sql.types._ import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit -import za.co.absa.spark.commons.errorhandling.partials.EvaluateIntoErrorMessage.FieldNames._ +import za.co.absa.spark.commons.errorhandling.partials.TransformIntoErrorMessage.FieldNames._ -trait EvaluateIntoErrorMessage { - protected def evaluate(errorMessageSubmit: ErrorMessageSubmit): Column = { +/** + * Trait offers a presumably very common implementation of [[za.co.absa.spark.commons.errorhandling.ErrorHandling.transformErrorSubmitToColumn ErrorHandling.transformErrorSubmitToColumn()]], + * where the error is transformed into the struct of [[za.co.absa.spark.commons.errorhandling.ErrorMessage ErrorMessage]]. + * @group Error Handling + * @since 0.6.0 + */ +trait TransformIntoErrorMessage { + protected def transformErrorSubmitToColumn(errorMessageSubmit: ErrorMessageSubmit): Column = { struct( errorMessageSubmit.errType.column as errType, errorMessageSubmit.errCode.column as errCode, - errorMessageSubmit.errMsg.column as errMsg, + errorMessageSubmit.errMessage.column as errMsg, errorMessageSubmit.errColsAndValues.column as errColsAndValues, - errorMessageSubmit.additionInfo.column as additionInfo + errorMessageSubmit.additionalInfo.column as additionInfo ) } /** - * errorColumnType describes what is the type of error columns + * Describes what is the type of the error column * @return StructType of DataType object */ def errorColumnType: DataType = { @@ -48,14 +54,14 @@ trait EvaluateIntoErrorMessage { } } -object EvaluateIntoErrorMessage { +object TransformIntoErrorMessage { object FieldNames { val errType = "errType" val errCode = "errCode" val errMsg = "errMsg" val errColsAndValues = "errColsAndValues" val additionInfo = "additionInfo" - val errCols = "errCols" + val errSourceCols = "errSourceCols" val errValues = "errValues" } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/EvaluateViaUdf.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/TransformViaUdf.scala similarity index 69% rename from spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/EvaluateViaUdf.scala rename to spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/TransformViaUdf.scala index 65c0af8a..f84b1c12 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/EvaluateViaUdf.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/partials/TransformViaUdf.scala @@ -20,23 +20,23 @@ import org.apache.spark.sql.{Column, SparkSession} import za.co.absa.spark.commons.OncePerSparkSession import za.co.absa.spark.commons.adapters.CallUdfAdapter import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit -import za.co.absa.spark.commons.errorhandling.partials.EvaluateViaUdf.ErrorMessageFunction +import za.co.absa.spark.commons.errorhandling.partials.TransformViaUdf.ErrorMessageFunction import za.co.absa.spark.commons.errorhandling.types._ -trait EvaluateViaUdf[T] extends OncePerSparkSession with CallUdfAdapter { - def evaluationUdfName: String - protected def evaluationUdf: ErrorMessageFunction[T] +trait TransformViaUdf[T] extends OncePerSparkSession with CallUdfAdapter { + def transformationUdfName: String + protected def transformationUdf: ErrorMessageFunction[T] - protected def evaluate(errorMessageSubmit: ErrorMessageSubmit): Column = { - call_udf(evaluationUdfName, + protected def transformErrorSubmitToColumn(errorMessageSubmit: ErrorMessageSubmit): Column = { + call_udf(transformationUdfName, errorMessageSubmit.errType.column, errorMessageSubmit.errCode.column, - errorMessageSubmit.errMsg.column, + errorMessageSubmit.errMessage.column, errorMessageSubmit.errColsAndValues.column, - errorMessageSubmit.additionInfo.column) + errorMessageSubmit.additionalInfo.column) } } -object EvaluateViaUdf { +object TransformViaUdf { type ErrorMessageFunction[T] = (ErrType, ErrCode, ErrMsg, ErrColsAndValues, AdditionalInfo) => T //TODO needed? } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValue.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValue.scala index d82df650..c65c2541 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValue.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValue.scala @@ -23,36 +23,117 @@ import za.co.absa.spark.commons.sql.functions.null_col import scala.language.higherKinds +/** + * Class to unify a representation of a [[za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit ErrorMessageSubmit]] segments. + * It can be build from `column`,column name or a set of column names, a constant value and others. + * The class then provides the ability to express each option as a Spark column used in other [[za.co.absa.spark.commons.errorhandling.ErrorHandling ErrorHandling]] + * classes and methods. + * @tparam T - The type of the value and the Scala equivalent of the column DataType + * @group Error Handling + * @since 0.6.0 + */ trait ColumnOrValue[T] { + /** + * @return `column` expression representing the input + * @group Error Handling + * @since 0.6.0 + */ def column: Column + + /** + * @return the name or names if columns are directly referenced. + * @group Error Handling + * @since 0.6.0 + */ def columnNames: Set[String] + + /** + * @return the constant value if entity was build from one, otherwise `None` + * @group Error Handling + * @since 0.6.0 + */ + def getValue: Option[T] } object ColumnOrValue { + /** + * Just a shorthand alias of [[ColumnOrValue]], for less typying + * @tparam T - The type of the value and the Scala equivalent of the column DataType + * @group Error Handling + * @since 0.6.0 + */ type CoV[T] = ColumnOrValue[T] //just a shorthand val CoV: ColumnOrValue.type = ColumnOrValue + /** + * Referencing exactly one column, by its name + * @param columnName - the column name + * @tparam T - The Scala type equivalent to the column `DataType` + * @group Error Handling + * @since 0.6.0 + */ def apply[T](columnName: String): ColumnOrValue[T] = CoVNamedColumn(columnName) + /** + * Referencing a column by its expression + * @param column - the column expression + * @tparam T - The Scala type equivalent to the column `DataType` + * @group Error Handling + * @since 0.6.0 + */ def apply[T](column: Column): ColumnOrValue[T] = CoVDefinedColumn(column) + /** + * Referencing a column which is a map of column names and their values transformed by the transformer + * @param mapColumnNames - the column names in the map + * @param columnTransformer - function to tranform the column values with + * @tparam T - The Scala type equivalent to the column `DataType` + * @group Error Handling + * @since 0.6.0 + */ def apply[T](mapColumnNames: Set[String], columnTransformer: ColumnTransformer): ColumnOrValue[Map[String, T]] = { CoVMapColumn(mapColumnNames, columnTransformer) } + /** + * Representing and optional string value - String or NULL + * @param value - the value to represent in the constant column or NULL if None + * @group Error Handling + * @since 0.6.0 + */ def withOption(value: Option[String]): ColumnOrValue[Option[String]] = { // could be safely an apply, or done more generally value match { case None => CoVNull(StringType) case Some(x) => CoVOption(x) } } + + /** + * Referencing a constant value + * @param value - the constant the column to represent + * @tparam T - The Scala type equivalent to the column `DataType` + * @group Error Handling + * @since 0.6.0 + */ def withValue[T](value: T): ColumnOrValue[T] = CoVValue(value) + /** + * @return - column of NULL values as StringType + * @group Error Handling + * @since 0.6.0 + */ def asEmpty: ColumnOrValue[Option[String]] = CoVNull(StringType) + + /** + * Referencing a column which is a map of column names and their values casted to string + * @param mapColumnNames - the column names in the map + * @group Error Handling + * @since 0.6.0 + */ def asMapOfStringColumns(mapColumnNames: Set[String]): ColumnOrValue[Map[String, String]] = CoVMapColumn(mapColumnNames, columnNameToItsStringValue) - def columnNameToItsStringValue(colName: String): Column = col(colName).cast(StringType) + private def columnNameToItsStringValue(colName: String): Column = col(colName).cast(StringType) private final case class CoVNamedColumn[T](columnName: String) extends ColumnOrValue[T] { val column: Column = col(columnName) diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/types/ErrorWhen.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/types/ErrorWhen.scala index bf506691..64722ed6 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/types/ErrorWhen.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/types/ErrorWhen.scala @@ -19,6 +19,14 @@ package za.co.absa.spark.commons.errorhandling.types import org.apache.spark.sql.Column import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit +/** + * A case class that puts together an error specification and the condition to identify it. + * The primary usage is in [[za.co.absa.spark.commons.errorhandling.ErrorHandling.putErrorsWithGrouping ErrorHandling.putErrorsWithGrouping()]] + * @param when - boolean column expression that should evaluate to true on and only on the error detection + * @param errorMessageSubmit - the error specification + * @group Error Handling + * @since 0.6.0 + */ case class ErrorWhen ( when: Column, errorMessageSubmit: ErrorMessageSubmit diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/sql/functions.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/sql/functions.scala index bde25f9b..47542036 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/sql/functions.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/sql/functions.scala @@ -56,16 +56,20 @@ object functions { } /** - * Provides a column with NULL value. - * - * @return The column of NULL values + * Provides a column of NULL values. + * @return - column of NULL values */ def null_col:Column = { lit(None.orNull) } + /** + * Provides a column of NULL values, but the actual type is per specification + * @param dataType - the actual data type of the column that will contain NULLs + * @return - column of NULL values + */ def null_col(dataType: DataType):Column = { - lit(None.orNull).cast(dataType) + null_col.cast(dataType) } } diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/utils/ExplodeTools.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/utils/ExplodeTools.scala index 1783d0bb..4b5ad5a3 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/utils/ExplodeTools.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/utils/ExplodeTools.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{ArrayType, StructType} import org.apache.spark.sql.{Column, DataFrame} import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements +import za.co.absa.spark.commons.sql.functions.null_col import za.co.absa.spark.commons.utils.explode.{Explosion, ExplosionContext} import za.co.absa.spark.hats.Extensions.DataFrameExtension @@ -343,7 +344,7 @@ object ExplodeTools { private def addSuperTransientField(inputDf: DataFrame, arrayColPathName: String): (DataFrame, String) = { val colName = inputDf.schema.getClosestUniqueName(superTransientColumnName) val nestedColName = (SchemaUtils.splitPath(arrayColPathName).dropRight(1) :+ colName).mkString(".") - val df = inputDf.nestedWithColumn(nestedColName, lit(null)) + val df = inputDf.nestedWithColumn(nestedColName, null_col) (df, nestedColName) } diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrorsTest.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrorsTest.scala index d4ddf1bc..c8cb2fe1 100644 --- a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrorsTest.scala +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrorsTest.scala @@ -43,13 +43,13 @@ class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBa test("aggregateErrorColumns should return an empty list after error aggregation") { val expectedResults: List[ResultDfRecordType] = List() - val e1 = ErrorHandlingFilterRowsWithErrors.putErrorToColumn("Test error 1", 1, "This is a test error", Some(col1Name)) + val e1 = ErrorHandlingFilterRowsWithErrors.createErrorAsColumn("Test error 1", 1, "This is a test error", Some(col1Name)) val errorSubmitA = ErrorMessageSubmitOnColumn("Test error 2", 2, "This is a test error", col2Name) - val e2 = ErrorHandlingFilterRowsWithErrors.putErrorToColumn(errorSubmitA) + val e2 = ErrorHandlingFilterRowsWithErrors.createErrorAsColumn(errorSubmitA) val errorSubmitB = ErrorMessageSubmitWithoutColumn("Test error 3", 3, "This is a test error") - val e3 = ErrorHandlingFilterRowsWithErrors.putErrorToColumn(errorSubmitB) + val e3 = ErrorHandlingFilterRowsWithErrors.createErrorAsColumn(errorSubmitB) - val resultsDF = ErrorHandlingFilterRowsWithErrors.aggregateErrorColumns(srcDf)(e1, e2, e3) + val resultsDF = ErrorHandlingFilterRowsWithErrors.applyErrorColumnsToDataFrame(srcDf)(e1, e2, e3) val results = resultDfToResult(resultsDF) assert(results.length == expectedResults.length) @@ -91,7 +91,7 @@ class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBa } test("errorColumnType should return a BooleanType") { - val errorColumn: ErrorColumn = ErrorHandlingFilterRowsWithErrors.putErrorToColumn( + val errorColumn: ErrorColumn = ErrorHandlingFilterRowsWithErrors.createErrorAsColumn( "Test error 1", 1, "This is a test error", Some(errColName)) val testDf = emptyDf.withColumn(errColName, errorColumn.column) @@ -104,17 +104,17 @@ class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBa assert(actualType == expectedType) } - test("errorColumnAggregationType should return None since no column is added during the aggregation") { - val errorColumn: ErrorColumn = ErrorHandlingFilterRowsWithErrors.putErrorToColumn( + test("dataFrameColumnType should return None since no column is added during the aggregation") { + val errorColumn: ErrorColumn = ErrorHandlingFilterRowsWithErrors.createErrorAsColumn( "1st error", 0, "This is an error", Some(errColName) ) val testDf = emptyDf - val expectedAfterAgg = ErrorHandlingFilterRowsWithErrors.aggregateErrorColumns(testDf)(errorColumn) + val expectedAfterAgg = ErrorHandlingFilterRowsWithErrors.applyErrorColumnsToDataFrame(testDf)(errorColumn) val expectedTypeAfterAgg = expectedAfterAgg.schema.fields.headOption - val actualType = ErrorHandlingFilterRowsWithErrors.errorColumnAggregationType + val actualType = ErrorHandlingFilterRowsWithErrors.dataFrameColumnType assert(actualType == expectedTypeAfterAgg) } diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorMessageArrayTest.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorMessageArrayTest.scala index 99c6fc0c..94701274 100644 --- a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorMessageArrayTest.scala +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorMessageArrayTest.scala @@ -33,7 +33,6 @@ class ErrorMessageArrayTest extends AnyFunSuite with SparkTestBase { private val col1Name = "Col1" private val col2Name = "Col2" - private val errColName = "errCol" private val srcDf = Seq( (None, ""), (Some(1), "a"), @@ -79,13 +78,13 @@ class ErrorMessageArrayTest extends AnyFunSuite with SparkTestBase { val errorMessageArray = ErrorMessageArray() - val e1 = errorMessageArray.putErrorToColumn("Test error 1", 1, "This is a test error", Some(col1Name)) + val e1 = errorMessageArray.createErrorAsColumn("Test error 1", 1, "This is a test error", Some(col1Name)) val errorSubmitA = ErrorMessageSubmitOnColumn("Test error 2", 2, "This is a test error", col2Name) - val e2 = errorMessageArray.putErrorToColumn(errorSubmitA) + val e2 = errorMessageArray.createErrorAsColumn(errorSubmitA) val errorSubmitB = ErrorMessageSubmitWithoutColumn("Test error 3", 3, "This is a test error") - val e3 = errorMessageArray.putErrorToColumn(errorSubmitB) + val e3 = errorMessageArray.createErrorAsColumn(errorSubmitB) - val resultDf = errorMessageArray.aggregateErrorColumns(srcDf)(e1, e2, e3) + val resultDf = errorMessageArray.applyErrorColumnsToDataFrame(srcDf)(e1, e2, e3) val result = resultDfToResult(resultDf) assert(result == expected) @@ -193,27 +192,27 @@ class ErrorMessageArrayTest extends AnyFunSuite with SparkTestBase { val result = resultDfToResult(resultDf) assert(result == expected) + assert(resultDf.columns.contains("MyErrCol")) } - - test("errorColumnAggregationType should return an ArrayType structure for column added during the aggregation") { + test("dataFrameColumnType should return an ArrayType structure for column added during the aggregation") { val errColName = "specialErrCol" val errorMessageArray = ErrorMessageArray(errColName) - val e1 = errorMessageArray.putErrorToColumn("Test error 1", 1, "This is a test error", Some(col1Name)) + val e1 = errorMessageArray.createErrorAsColumn("Test error 1", 1, "This is a test error", Some(col1Name)) val errorSubmitA = ErrorMessageSubmitOnColumn("Test error 2", 2, "This is a test error", col2Name) - val e2 = errorMessageArray.putErrorToColumn(errorSubmitA) + val e2 = errorMessageArray.createErrorAsColumn(errorSubmitA) val errorSubmitB = ErrorMessageSubmitWithoutColumn("Test error 3", 3, "This is a test error") - val e3 = errorMessageArray.putErrorToColumn(errorSubmitB) + val e3 = errorMessageArray.createErrorAsColumn(errorSubmitB) val origSchemaSize = srcDf.schema.fieldNames.length - val dfAfterAgg = errorMessageArray.aggregateErrorColumns(srcDf)(e1, e2, e3) + val dfAfterAgg = errorMessageArray.applyErrorColumnsToDataFrame(srcDf)(e1, e2, e3) assert(dfAfterAgg.schema.fieldNames.length == origSchemaSize + 1) // checkc only one field was added... assert(dfAfterAgg.schema.fieldNames(origSchemaSize) == errColName) // and is of correct name... //... and type val addedColType = dfAfterAgg.schema.fields(origSchemaSize).dataType // while using `get` in `Option` is discouraged, it's ok here, as it's expected the option to be non-empty; and if empty // the test will fail, which is correct - val result = errorMessageArray.errorColumnAggregationType.get + val result = errorMessageArray.dataFrameColumnType.get import DataTypeImplicits.DataTypeEnhancements assert(result.isEquivalentDataType(addedColType)) } diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitJustErrorValueTest.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitJustErrorValueTest.scala index 37c7f2d1..3fc420e9 100644 --- a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitJustErrorValueTest.scala +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitJustErrorValueTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2023 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,9 +40,9 @@ class ErrorMessageSubmitJustErrorValueTest extends AnyFunSuite { expectedErrType assertTo result.errType expectedErrCode assertTo result.errCode - expectedErrMsg assertTo result.errMsg + expectedErrMsg assertTo result.errMessage expectedErrValuesCol assertTo result.errColsAndValues - expectedAdditionalInfo assertTo result.additionInfo + expectedAdditionalInfo assertTo result.additionalInfo } test("Apply function properly hands over data with additional info") { @@ -63,8 +63,9 @@ class ErrorMessageSubmitJustErrorValueTest extends AnyFunSuite { expectedErrType assertTo result.errType expectedErrCode assertTo result.errCode - expectedErrMsg assertTo result.errMsg + expectedErrMsg assertTo result.errMessage expectedErrValuesCol assertTo result.errColsAndValues - expectedAdditionalInfo assertTo result.additionInfo + expectedAdditionalInfo assertTo result.additionalInfo + } } diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnColumnTest.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnColumnTest.scala index 432986ce..47072491 100644 --- a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnColumnTest.scala +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnColumnTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2023 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,9 +39,9 @@ class ErrorMessageSubmitOnColumnTest extends AnyFunSuite { expectedErrType assertTo result.errType expectedErrCode assertTo result.errCode - expectedErrMsg assertTo result.errMsg + expectedErrMsg assertTo result.errMessage expectedCol assertTo result.errColsAndValues - expectedAdditionalInfo assertTo result.additionInfo + expectedAdditionalInfo assertTo result.additionalInfo } test("Apply function properly hands over data with additional info") { @@ -61,8 +61,8 @@ class ErrorMessageSubmitOnColumnTest extends AnyFunSuite { expectedErrType assertTo result.errType expectedErrCode assertTo result.errCode - expectedErrMsg assertTo result.errMsg + expectedErrMsg assertTo result.errMessage expectedCol assertTo result.errColsAndValues - expectedAdditionalInfo assertTo result.additionInfo + expectedAdditionalInfo assertTo result.additionalInfo } } diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnMoreColumnsTest.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnMoreColumnsTest.scala index 656a78d2..262b23f2 100644 --- a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnMoreColumnsTest.scala +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitOnMoreColumnsTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2023 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,9 +44,9 @@ class ErrorMessageSubmitOnMoreColumnsTest extends AnyFunSuite { expectedErrType assertTo result.errType expectedErrCode assertTo result.errCode - expectedErrMsg assertTo result.errMsg + expectedErrMsg assertTo result.errMessage expectedCol assertTo result.errColsAndValues - expectedAdditionalInfo assertTo result.additionInfo + expectedAdditionalInfo assertTo result.additionalInfo } test("Apply function properly hands over data with additional info") { @@ -71,8 +71,8 @@ class ErrorMessageSubmitOnMoreColumnsTest extends AnyFunSuite { expectedErrType assertTo result.errType expectedErrCode assertTo result.errCode - expectedErrMsg assertTo result.errMsg + expectedErrMsg assertTo result.errMessage expectedCol assertTo result.errColsAndValues - expectedAdditionalInfo assertTo result.additionInfo + expectedAdditionalInfo assertTo result.additionalInfo } } diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitWithoutColumnTest.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitWithoutColumnTest.scala index 9ea0a8fe..e1433cff 100644 --- a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitWithoutColumnTest.scala +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/submits/ErrorMessageSubmitWithoutColumnTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2023 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package za.co.absa.spark.commons.errorhandling.implementations.submits -import org.apache.spark.sql.functions.{lit, map, typedLit} +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types.StringType import org.scalatest.funsuite.AnyFunSuite import za.co.absa.spark.commons.errorhandling.types.{AdditionalInfo, ColumnOrValueForm, ErrColsAndValues} @@ -38,10 +38,11 @@ class ErrorMessageSubmitWithoutColumnTest extends AnyFunSuite { expectedErrType assertTo result.errType expectedErrCode assertTo result.errCode - expectedErrMsg assertTo result.errMsg + expectedErrMsg assertTo result.errMessage result.errColsAndValues.column.expr expectedErrValuesCol assertTo result.errColsAndValues - expectedAdditionalInfo assertTo result.additionInfo + expectedAdditionalInfo assertTo result.additionalInfo + } test("Apply function properly hands over data with additional info") { @@ -49,7 +50,6 @@ class ErrorMessageSubmitWithoutColumnTest extends AnyFunSuite { val errCode = 201L val errMsg = "This is a test error" val additionalInfo = "{}" - val columnValue: ErrColsAndValues = Map.empty val result = ErrorMessageSubmitWithoutColumn(errType, errCode, errMsg, Some(additionalInfo)) @@ -61,8 +61,8 @@ class ErrorMessageSubmitWithoutColumnTest extends AnyFunSuite { expectedErrType assertTo result.errType expectedErrCode assertTo result.errCode - expectedErrMsg assertTo result.errMsg + expectedErrMsg assertTo result.errMessage expectedErrValuesCol assertTo result.errColsAndValues - expectedAdditionalInfo assertTo result.additionInfo + expectedAdditionalInfo assertTo result.additionalInfo } } diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValueForm.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValueForm.scala index 1dd042f3..1d5288e1 100644 --- a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValueForm.scala +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValueForm.scala @@ -1,5 +1,5 @@ /* - * Copyright 2023 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValueTest.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValueTest.scala index 0829205d..c28d19e7 100644 --- a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValueTest.scala +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/types/ColumnOrValueTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2023 ABSA Group Limited + * Copyright 2021 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.