Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#101: ErrorHandling documentation and fields renames #103

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7178f32
#83: Create a Spike for error handling
benedeki Mar 7, 2023
de99046
* UT fix
benedeki Mar 7, 2023
2db8c95
* Relatively big overwrite to use `map` instead of errCol and sequenc…
benedeki Mar 14, 2023
51b16f4
* Forgotten `register` function call
benedeki Mar 14, 2023
4783fca
* line ends improved
benedeki Mar 14, 2023
c8c4aa0
* ErrorMessageSubmits moved to submits sub-package
benedeki Mar 16, 2023
8e747e7
Merge pull request #85 from AbsaOSS/feature/83-create-a-spike-for-err…
benedeki Mar 16, 2023
64dc95e
* Added UTs for `ColumnOrValue`
benedeki Mar 20, 2023
e29098d
* changed `ErrorMessageArrayTest` to actual test suite
benedeki Apr 24, 2023
8fe1b27
* `ErrorHandling` - put abstract methods first
benedeki Apr 25, 2023
9543c94
* addressed PR comments
benedeki Apr 25, 2023
8822c8a
Update spark-commons/src/main/scala/za/co/absa/spark/commons/errorhan…
benedeki Apr 26, 2023
eec8d4f
Merge branch 'master' into feature/83-create-a-spike-for-error-handling
benedeki Apr 26, 2023
d9a2129
* Further PR comments addressed
benedeki Apr 26, 2023
1c59acf
Merge branch 'master' into feature/83-create-a-spike-for-error-handling
benedeki Apr 30, 2023
5b317d8
* more UTs
benedeki Apr 30, 2023
96c2308
* Added Jacoco exclusion for adapters
benedeki Apr 30, 2023
30f50c3
#101: ErrorHandling documentation and fields renames
benedeki May 2, 2023
38b54b4
* Fixing license year
benedeki May 3, 2023
93a5c3b
Merge branch 'master' into feature/101-errorhandling-documentation-an…
benedeki May 4, 2023
712931b
* Fix after merge with develop
benedeki May 4, 2023
b99d06a
* fixed and added cross-links
benedeki May 5, 2023
efda9fb
Merge branch 'master' into feature/101-errorhandling-documentation-an…
benedeki May 8, 2023
6e24db0
* Fixed after merge and conflicts resolution
benedeki May 8, 2023
68fc0aa
* PR comments addressed
benedeki May 8, 2023
963787c
* README.md enhanced
benedeki May 9, 2023
656dfc5
* Removed unused code
benedeki May 11, 2023
b789cac
Merge branch 'master' into feature/101-errorhandling-documentation-an…
benedeki May 23, 2023
378f1fe
Merge branch 'master' into feature/101-errorhandling-documentation-an…
benedeki Jun 20, 2023
49855db
* changed "dead code" to documentation string
benedeki Jun 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/jacoco_check.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2023 ABSA Group Limited
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2022 ABSA Group Limited
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
7 changes: 7 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ lazy val commonSettings = Seq(
Test / parallelExecution := false
)

lazy val commonJacocoExcludes: Seq[String] = Seq(
"za.co.absa.spark.commons.adapters.CallUdfAdapter",
"za.co.absa.spark.commons.adapters.TransformAdapter"
// "za.co.absa.spark.commons.utils.JsonUtils*", // class and related objects
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see dead code, is reserved for something or can removed

Copy link
Collaborator

@lsulak lsulak Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still here (and 1 below)

// "za.co.absa.spark.commons.utils.ExplodeTools" // class only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example with * is still hint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will actually change it to make it obvious it's a documentation and not dead code 😉

)

lazy val parent = (project in file("."))
.aggregate(sparkCommons.projectRefs ++ sparkCommonsTest.projectRefs: _*)
.settings(
Expand Down
2 changes: 1 addition & 1 deletion project/JacocoSetup.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 ABSA Group Limited
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ addSbtPlugin("org.ow2.asm" % "asm-commons" % ow2Version from ow2Url("asm-commons
addSbtPlugin("org.ow2.asm" % "asm-tree" % ow2Version from ow2Url("asm-tree"))

addSbtPlugin("za.co.absa.sbt" % "sbt-jacoco" % "3.4.1-absa.3" from "https://github.com/AbsaOSS/sbt-jacoco/releases/download/3.4.1-absa.3/sbt-jacoco-3.4.1-absa.3.jar")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")
lsulak marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
* instantiated classes thus not running the method [[register]] again on them.
*
* Usage: extend this abstract class and implement the method [[register]]. On initialization the
* [[register]] method gets called by the [[za.co.absa.spark.commons.OncePerSparkSession$.registerMe]] method if the class + spark session
* [[register]] method gets called by the [[za.co.absa.spark.commons.OncePerSparkSession$.registerMe OncePerSparkSession.registerMe()]] method if the class + spark session
* combination is unique. If it is not unique [[register]] will not get called again.
* This way we ensure only single registration per spark session.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,130 @@

package za.co.absa.spark.commons.errorhandling

import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{CaseWhen, Expression}
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.spark.commons.errorhandling.implementations.submits.{ErrorMessageSubmitOnColumn, ErrorMessageSubmitWithoutColumn}
import za.co.absa.spark.commons.errorhandling.types._

/**
* The basic class of error handling component. Every library that wants to use the component during Spark data
* processing should utilize this trait and its methods. The methods serve to record the errors and attach them to the
* [[org.apache.spark.sql.DataFrame spark.DataFrame]]. The trait should be an input parameter for such library, perhaps as an implicit.
* On the other side the end application provides concrete [[ErrorHandling]] implementation, that does the actual error
* handling by the application desire.
* For easy to use and as examples, a few general implementations are provided in the implementations sub-folder.
* Also for common, repeated implementations the folder `partials` offer some traits.
*/
trait ErrorHandling {
def putErrorToColumn(errorMessageSubmit: ErrorMessageSubmit): ErrorColumn
/**
* First of the few methods that needs to be coded in the trait implementation
* The purpose of this method is to convert the error specification into a [[org.apache.spark.sql.Column spark.Column]] expression
* @param errorMessageSubmit - the error specification
* @return - the error specification transformed into a column expression
* @group Error Handling
* @since 0.6.0
*/
protected def transformErrorSubmitToColumn(errorMessageSubmit: ErrorMessageSubmit): Column

def aggregateErrorColumns(dataFrame: DataFrame)(errCols: ErrorColumn*): DataFrame
/**
* Applies the provided columns to the incoming [[org.apache.spark.sql.DataFrame spark.DataFrame]]. Usually they might be aggregated in some way and attached
* to the [[org.apache.spark.sql.DataFrame spark.DataFrame]], but any other operations are imaginable. Unless really bent, the incoming columns are those
* produced by [[transformErrorSubmitToColumn]].
* The idea here is that the error column contains information of the error that occurred on the row or is empty (NULL)
* otherwise.
* In each implementation calling the function to each column separately or in any grouping of columns should produce
* the same result (with the exception of order of errors in the aggregation).
* @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to apply the error columns to
* @param errCols - the list of error columns to apply
* @return - data frame with the error columns applied (aggregated and attached or done otherwise)
*/
protected def doApplyErrorColumnsToDataFrame(dataFrame: DataFrame, errCols: Column*): DataFrame
lsulak marked this conversation as resolved.
Show resolved Hide resolved

/**
* The idea of this function is: "Put the error specified to the provided dataframe if the condition is true on the row."
* The error is transformed to a column using the [[transformErrorSubmitToColumn]] method and applied to the data frame
* if the "when" condition is true using the [[doApplyErrorColumnsToDataFrame]] method.
* @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to operate on
* @param when - the condition that defines the error occurred on the row
* @param errorMessageSubmit - the detected error specification
* @return - the original [[org.apache.spark.sql.DataFrame spark.DataFrame]] with the error detection applied
* @group Error Handling
* @since 0.6.0
*/
def putError(dataFrame: DataFrame)(when: Column)(errorMessageSubmit: ErrorMessageSubmit): DataFrame = {
putErrorsWithGrouping(dataFrame)(Seq(ErrorWhen(when, errorMessageSubmit)))
}
def putErrorsWithGrouping(dataFrame: DataFrame)(errorsWhen: Seq[ErrorWhen]): DataFrame

def putErrorToColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errCol: Option[ErrSourceColName], additionalInfo: AdditionalInfo = None): ErrorColumn = {
val toSubmit = errCol
/**
* Same as [[putError]], but allows a series of pairs condition-error to be specified at once.
* It should be noted, that once an error has been identified for a field on the row, no more conditions bound to that
* field are evaluated.
* @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to operate on
* @param errorsWhen - the list of condition-error pairs, the condition are grouped by the field of the error submissions
* @return - the original data frame with the error detection applied
* @group Error Handling
* @since 0.6.0
*/
def putErrorsWithGrouping(dataFrame: DataFrame)(errorsWhen: Seq[ErrorWhen]): DataFrame = {
def errorWhenToCol(errorWhen: ErrorWhen): Column = {
when(errorWhen.when, transformErrorSubmitToColumn(errorWhen.errorMessageSubmit))
}
def errorWhenSeqToCol(errorsWhen: Seq[ErrorWhen]): Column = {
val branches: Seq[(Expression, Expression)] = errorsWhen.map(errorWhen => (errorWhen.when.expr, transformErrorSubmitToColumn(errorWhen.errorMessageSubmit).expr))
new Column(CaseWhen(branches))
}

val errorsByColumn = errorsWhen.groupBy(_.errorMessageSubmit.errColsAndValues.columnNames)
val noColNames = Set.empty[String]
val errorColumns1 = errorsByColumn.getOrElse(noColNames, Seq.empty).map(errorWhenToCol) // no grouping without ErrCol names
val errorColumns2 = (errorsByColumn - noColNames).values.map(errorWhenSeqToCol).toSeq
doApplyErrorColumnsToDataFrame(dataFrame, errorColumns1 ++ errorColumns2: _*)
}

/**
* Transforms an error information into a column expression. For cases when simple column expression condition used in
* [[putError]] is not suitable for whatever reason.
* The returned [[types.ErrorColumn]] should then be used in [[applyErrorColumnsToDataFrame]].
* @param errorMessageSubmit - the error specification
* @return - [[types.ErrorColumn]] expression containing the error specification
* @group Error Handling
* @since 0.6.0
*/
def createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit): ErrorColumn = {
ErrorColumn(transformErrorSubmitToColumn(errorMessageSubmit))
}

/**
* Same as the other [[ErrorHandling!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandling\.ErrorMessageSubmit)* createErrorAsColumn(errorMessageSubmit: ErrorMessageSubmit)]], only providing the error specification
* in decomposed state, not in the [[ErrorMessageSubmit]] trait form.
* @param errType - word description of the type of the error
* @param errCode - number designation of the type of the error
* @param errMessage - human friendly description of the error
* @param errSourceColName - the name of the column the error happened at
* @param additionalInfo - any optional additional info in JSON format
* @return - [[types.ErrorColumn]] expression containing the error specification
* @group Error Handling
* @since 0.6.0
*/
def createErrorAsColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errSourceColName: Option[ErrSourceColName], additionalInfo: AdditionalInfo = None): ErrorColumn = {
val toSubmit = errSourceColName
.map(errSourceColName => ErrorMessageSubmitOnColumn(errType, errCode, errMessage, errSourceColName, additionalInfo))
.getOrElse(ErrorMessageSubmitWithoutColumn(errType, errCode, errMessage, additionalInfo))
putErrorToColumn(toSubmit)
createErrorAsColumn(toSubmit)
}

/**
* Applies the earlier collected [[types.ErrorColumn ErrorColumns]] to the provided [[org.apache.spark.sql.DataFrame spark.DataFrame]].
* See [[doApplyErrorColumnsToDataFrame]] for detailed functional explanation.
* @param dataFrame - the [[org.apache.spark.sql.DataFrame spark.DataFrame]] to operate on
* @param errCols - a list of [[types.ErrorColumn]] returned by previous calls of [[ErrorHandling!.createErrorAsColumn(errorMessageSubmit:za\.co\.absa\.spark\.commons\.errorhandling\.ErrorMessageSubmit)* createErrorAsColumn]]
* @return - the original data frame with the error detection applied
* @group Error Handling
* @since 0.6.0
*/
def applyErrorColumnsToDataFrame(dataFrame: DataFrame)(errCols: ErrorColumn*): DataFrame = {
doApplyErrorColumnsToDataFrame(dataFrame, errCols.map(_.column): _*)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ package za.co.absa.spark.commons.errorhandling

import za.co.absa.spark.commons.errorhandling.types._

/**
* Trait collecting error definition in a format usable during Spark data processing
* @group Error Handling
* @since 0.6.0
*/
trait ErrorMessageSubmit {
def errType: ColumnOrValue[ErrType]
def errCode: ColumnOrValue[ErrCode]
def errMsg: ColumnOrValue[ErrMsg]
def errMessage: ColumnOrValue[ErrMsg]
def errColsAndValues: ColumnOrValue[ErrColsAndValues]
def additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
def additionalInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ package za.co.absa.spark.commons.errorhandling.implementations
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{array, array_except, array_union, col, map_from_arrays, map_keys, map_values, struct, when}
import za.co.absa.spark.commons.adapters.TransformAdapter
import za.co.absa.spark.commons.errorhandling.partials.EvaluateIntoErrorMessage.FieldNames._
import za.co.absa.spark.commons.errorhandling.partials.{ErrorHandlingCommon, EvaluateIntoErrorMessage}
import za.co.absa.spark.commons.errorhandling.ErrorHandling
import za.co.absa.spark.commons.errorhandling.partials.TransformIntoErrorMessage.FieldNames._
import za.co.absa.spark.commons.errorhandling.partials.TransformIntoErrorMessage
import za.co.absa.spark.commons.sql.functions.null_col
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements

/**
* An implementation of [[ErrorHandling]] the collects errors into columns of struct based on [[za.co.absa.spark.commons.errorhandling.ErrorMessage ErrorMessage]] case class.
lsulak marked this conversation as resolved.
Show resolved Hide resolved
* Upon applying the non-NULL columns are aggregated into an array column which is attached to the [[org.apache.spark.sql.DataFrame spark.DataFrame]].
* In case the column already exists in the DataFrame, the columns are appended to the column.
* @param errorColumnName - the name of the array column aggregating all the errors
*/
case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.defaultErrorColumnName)
extends ErrorHandlingCommon
with EvaluateIntoErrorMessage
extends ErrorHandling
with TransformIntoErrorMessage
with TransformAdapter {

private def decomposeMap(errorMessageColumn: Column): Column = {
Expand All @@ -35,7 +42,7 @@ case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.default
errorMessageColumn.getField(errType) as errType,
errorMessageColumn.getField(errCode) as errCode,
errorMessageColumn.getField(errMsg) as errMsg,
map_keys(errorMessageColumn.getField(errColsAndValues)) as errCols,
map_keys(errorMessageColumn.getField(errColsAndValues)) as errSourceCols,
map_values(errorMessageColumn.getField(errColsAndValues)) as errValues,
errorMessageColumn.getField(additionInfo) as additionInfo
)
Expand All @@ -47,7 +54,7 @@ case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.default
errorMessageColumn.getField(errType) as errType,
errorMessageColumn.getField(errCode) as errCode,
errorMessageColumn.getField(errMsg) as errMsg,
map_from_arrays(errorMessageColumn.getField(errCols), errorMessageColumn.getField(errValues)) as errColsAndValues,
map_from_arrays(errorMessageColumn.getField(errSourceCols), errorMessageColumn.getField(errValues)) as errColsAndValues,
errorMessageColumn.getField(additionInfo) as additionInfo
)
}
Expand All @@ -59,7 +66,7 @@ case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.default
dataFrame.withColumn(errorColName, reMap(array_union(deMap(col(errorColName)), colToUnion)))
}

protected def doTheColumnsAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = {
protected def doApplyErrorColumnsToDataFrame(dataFrame: DataFrame, errCols: Column*): DataFrame = {
val aggregated = array(errCols.map(decomposeMap): _*) //need to decompose the map field, as it's not supported in array functions
val aggregatedWithoutNulls = array_except(aggregated, array(null_col))
val joinToExisting: (DataFrame, String) => DataFrame = appendToErrCol(_, _, aggregatedWithoutNulls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,42 @@ import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.implementations.submits.ErrorMessageSubmitJustErrorValue.noColumnKey
import za.co.absa.spark.commons.errorhandling.types._

/**
* [[za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit ErrorMessageSubmit]] subclass to represent an error not
* bound to a particular column but still having a value that caused the error.
* @param errType - error type
* @param errCode - error code
* @param errMessage - error message
* @param errValue - the value that caused the error
* @param additionalInfo - optional additional info in form of JSON
* @group Error Handling
* @since 0.6.0
*/
class ErrorMessageSubmitJustErrorValue(
val errType: ColumnOrValue[ErrType],
val errCode: ColumnOrValue[ErrCode],
val errMsg: ColumnOrValue[ErrMsg],
val errMessage: ColumnOrValue[ErrMsg],
errValue: ColumnOrValue[ErrValue],
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
override val additionalInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmit {
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(map(lit(noColumnKey), errValue.column.cast(StringType)))
}

object ErrorMessageSubmitJustErrorValue {
val noColumnKey: ErrSourceColName = ""

def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: ErrValue, additionalInfo: AdditionalInfo = None): ErrorMessageSubmitJustErrorValue = {
/**
* Convenient apply function
* @param errType - error type
* @param errCode - error code
* @param errMessage - error message
* @param errValue - the value that caused the error
* @param additionalInfo - optional additional info in form of JSON
* @return - instance of [[ErrorMessageSubmitJustErrorValue]]
* @group Error Handling
* @since 0.6.0
*/
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: String, additionalInfo: AdditionalInfo = None): ErrorMessageSubmitJustErrorValue = {
new ErrorMessageSubmitJustErrorValue(
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
Expand Down
Loading