Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing Map for error columns and their values #85

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
* limitations under the License.
*/

package za.co.absa.spark.commons.sql
package za.co.absa.spark.commons.adapters

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.callUDF

import scala.util.{Success, Try}

// scalastyle:off
object functions2 {
// scalastyle:on

def call_udf(udfName: String, cols: Column*): Column = call_udf(udfName, cols:_*)

trait CallUdfAdapter {
def call_udf(udfName: String, cols: Column*): Column = callUDF(udfName, cols:_*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
* limitations under the License.
*/

package za.co.absa.spark.commons.sql
package za.co.absa.spark.commons.adapters

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.callUDF

import scala.util.{Success, Try}

// scalastyle:off
object functions2 {
// scalastyle:on

def call_udf(udfName: String, cols: Column*): Column = call_udf(udfName, cols:_*)
import org.apache.spark.sql.functions.{call_udf => sparkCallUdf}

trait CallUdfAdapter {
def call_udf(udfName: String, cols: Column*): Column = sparkCallUdf(udfName, cols:_*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
* limitations under the License.
*/

package za.co.absa.spark.commons.sql
package za.co.absa.spark.commons.adapters

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.callUDF

import scala.util.{Success, Try}

// scalastyle:off
object functions2 {
// scalastyle:on

def call_udf(udfName: String, cols: Column*): Column = call_udf(udfName, cols:_*)
import org.apache.spark.sql.functions.{call_udf => sparkCallUdf}

trait CallUdfAdapter {
def call_udf(udfName: String, cols: Column*): Column = sparkCallUdf(udfName, cols:_*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.spark.commons.errorhandling.types._
trait ErrorHandling {
def register(sparkToRegisterTo: SparkSession): Unit = {}

def putErrorToColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errCol: ErrCol, additionalInfo: AdditionalInfo = None): ErrorColumn = {
def putErrorToColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errCol: Option[ErrSourceColName], additionalInfo: AdditionalInfo = None): ErrorColumn = {
val toSubmit = errCol
.map(errSourceColName => ErrorMessageSubmitOnColumn(errType, errCode, errMessage, errSourceColName, additionalInfo))
.getOrElse(ErrorMessageSubmitWithoutColumn(errType, errCode, errMessage, additionalInfo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package za.co.absa.spark.commons.errorhandling

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import za.co.absa.spark.commons.errorhandling.ErrorMessage.Mapping
import za.co.absa.spark.commons.errorhandling.types._

/**
Expand All @@ -27,30 +24,14 @@ import za.co.absa.spark.commons.errorhandling.types._
* @param errType - Type or source of the error
* @param errCode - Internal error code
* @param errMsg - Textual description of the error
* @param errCol - The name of the column where the error occurred
* @param rawValues - Sequence of raw values (which are the potential culprits of the error)
* @param additionInfo - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column
* @param errColsAndValues - The names of the columns where the error occurred and their raw values (which are the
* potential culprits of the error)
* @param additionInfo - any optional additional information in the form of a JSON string
*/
case class ErrorMessage(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking if it wouldn't be better to use the types directly instead of type aliases. These type aliases might add some overhead for the users. I'm a bit stupid so I would be constantly forgetting what type ErrType really is 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason why I used type aliases was that I wasn't sure we are settled on the types. This way it is rather easy to change.

Also the AdditionalInfo actually is planned to be more then alias eventually (#90)

I am happy to discuss the other aliases once the thing is settled (aliases should be easy to remove) 👍

errType: ErrType,
errCode: ErrCode,
errMsg: ErrMsg,
errCol: ErrCol,
rawValues: RawValues,
errColsAndValues: ErrColsAndValues,
additionInfo: AdditionalInfo = None
)

object ErrorMessage {
//TODO probably not needed
case class Mapping(
mappingTableColumn: String,
mappedDatasetColumn: String
)

val errorColumnName = "errCol"
def errorColSchema(implicit spark: SparkSession): StructType = {
import spark.implicits._
spark.emptyDataset[ErrorMessage].schema
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ trait ErrorMessageSubmit {
def errType: ColumnOrValue[ErrType]
def errCode: ColumnOrValue[ErrCode]
def errMsg: ColumnOrValue[ErrMsg]
def errCol: ColumnOrValue[ErrCol]
def rawValues: ColumnOrValue[RawValues]
def errColsAndValues: ColumnOrValue[ErrColsAndValues]
def additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,54 @@

package za.co.absa.spark.commons.errorhandling.implementations

import org.apache.spark.sql.functions.{array, array_except, array_union, col}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{array, array_except, array_union, col, map_from_arrays, map_keys, map_values, struct, when}
import za.co.absa.spark.commons.adapters.TransformAdapter
import za.co.absa.spark.commons.errorhandling.partials.EvaluateIntoErrorMessage.FieldNames._
import za.co.absa.spark.commons.errorhandling.partials.{ErrorHandlingCommon, EvaluateIntoErrorMessage}
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.spark.commons.sql.functions.null_col
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements

case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.defaultErrorColumnName)
extends ErrorHandlingCommon
with EvaluateIntoErrorMessage {

override protected def doTheAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = {
def appendToArray(dataFrame: DataFrame, colName: String, colToUnion: Column): DataFrame = {
dataFrame.withColumn(colName, array_union(col(colName), colToUnion))
}
val aggregatedWithouNulls = array_except(array(errCols: _*), array(null_col))
dataFrame.withColumnIfDoesNotExist(appendToArray(_, _, aggregatedWithouNulls))(errorColumnName, aggregatedWithouNulls)
with EvaluateIntoErrorMessage
with TransformAdapter {

private def decomposeMap(errorMessageColumn: Column): Column = {
when(errorMessageColumn.isNotNull,
struct(
errorMessageColumn.getField(errType) as errType,
errorMessageColumn.getField(errCode) as errCode,
errorMessageColumn.getField(errMsg) as errMsg,
map_keys(errorMessageColumn.getField(errColsAndValues)) as errCols,
map_values(errorMessageColumn.getField(errColsAndValues)) as errValues,
errorMessageColumn.getField(additionInfo) as additionInfo
)
)
}

private def recomposeMap(errorMessageColumn: Column): Column = {
struct(
errorMessageColumn.getField(errType) as errType,
errorMessageColumn.getField(errCode) as errCode,
errorMessageColumn.getField(errMsg) as errMsg,
map_from_arrays(errorMessageColumn.getField(errCols), errorMessageColumn.getField(errValues)) as errColsAndValues ,
errorMessageColumn.getField(additionInfo) as additionInfo
)
}

private def deMap(arrayCol: Column): Column = transform(arrayCol, decomposeMap)
private def reMap(arrayCol: Column): Column = transform(arrayCol, recomposeMap)

private def appendToErrCol(dataFrame: DataFrame, errorColName: String, colToUnion: Column): DataFrame = {
dataFrame.withColumn(errorColName, reMap(array_union(deMap(col(errorColName)), colToUnion)))
}

protected def doTheAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking if we should use the word "aggregation" here and also in ErrorHandling trait. So I searched the dictionary for the definition of the word and it fits here but when I first started reading the code, I thought that there would be some df.groupBy(...).agg(...), i.e. some squishing of rows and then some calculation 😄 But if I understood correctly, it's more for combining the errors into one column (or actually, I guess it depends on the implementation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Actually you inspired me, as it's a little an internal function it can be little more eloquent. Hopefully it will improve the understanding. 😉

val aggregated = array(errCols.map(decomposeMap): _*) //need to decompose the map field, as it's not supported in array functions
val aggregatedWithoutNulls = array_except(aggregated, array(null_col))
val joinToExisting: (DataFrame, String) => DataFrame = appendToErrCol(_, _, aggregatedWithoutNulls)
dataFrame.withColumnIfDoesNotExist(joinToExisting)(errorColumnName, reMap(aggregatedWithoutNulls))
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.errorhandling.implementations

import org.apache.spark.sql.functions.{lit, map}
import org.apache.spark.sql.types.StringType
import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.implementations.ErrorMessageSubmitJustErrorValue.noColumnKey
import za.co.absa.spark.commons.errorhandling.types._

case class ErrorMessageSubmitJustErrorValue(
errType: ColumnOrValue[ErrType],
errCode: ColumnOrValue[ErrCode],
errMsg: ColumnOrValue[ErrMsg],
errValue: ColumnOrValue[String],
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmit {
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(map(lit(noColumnKey), errValue.column.cast(StringType)))
}

object ErrorMessageSubmitJustErrorValue {
val noColumnKey: ErrSourceColName = ""

def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: String): ErrorMessageSubmitJustErrorValue = {
new ErrorMessageSubmitJustErrorValue(
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
ColumnOrValue.withValue(errMessage),
ColumnOrValue.withValue(errValue),
ColumnOrValue.asEmpty
)
}

def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: String, additionalInfo: AdditionalInfo): ErrorMessageSubmitJustErrorValue = {
new ErrorMessageSubmitJustErrorValue(
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
ColumnOrValue.withValue(errMessage),
ColumnOrValue.withValue(errValue),
ColumnOrValue.withOption(additionalInfo)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,24 @@

package za.co.absa.spark.commons.errorhandling.implementations

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{array, col}
import org.apache.spark.sql.types.StringType
import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.types._

class ErrorMessageSubmitOnColumn (
val errType: ColumnOrValue[ErrType],
val errCode: ColumnOrValue[ErrCode],
val errMsg: ColumnOrValue[ErrMsg],
errSourceColName: ErrSourceColName,
errType: ColumnOrValue[ErrType],
errCode: ColumnOrValue[ErrCode],
errMsg: ColumnOrValue[ErrMsg],
errColName: ErrSourceColName,
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmit {
val errCol: ColumnOrValue[ErrCol] = ColumnOrValue.withOption(Option(errSourceColName))
override def rawValues: ColumnOrValue[RawValues] = {
val colExpr: Column = array(col(errSourceColName).cast(StringType))
ColumnOrValue(colExpr)
}
) extends ErrorMessageSubmitOnMoreColumns(errType, errCode, errMsg, Set(errColName), additionInfo) {
}

object ErrorMessageSubmitOnColumn {
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errSourceColName: ErrSourceColName, additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnColumn = {
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errColName: ErrSourceColName, additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnColumn = {
new ErrorMessageSubmitOnColumn(
ColumnOrValue.withActualValue(errType),
ColumnOrValue.withActualValue(errCode),
ColumnOrValue.withActualValue(errMessage),
errSourceColName,
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
ColumnOrValue.withValue(errMessage),
errColName,
ColumnOrValue.withOption(additionalInfo)
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new line after line 39

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do that, if it improves reading. But to be honest I probably forget more often 😊

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.errorhandling.implementations

import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.types.ColumnOrValue.columnNameToItsStringValue
import za.co.absa.spark.commons.errorhandling.types._

class ErrorMessageSubmitOnMoreColumns(
val errType: ColumnOrValue[ErrType],
val errCode: ColumnOrValue[ErrCode],
val errMsg: ColumnOrValue[ErrMsg],
errColNames: Set[ErrSourceColName],
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmit {
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(errColNames, columnNameToItsStringValue)

}

object ErrorMessageSubmitOnMoreColumns {
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errColNames: Set[ErrSourceColName], additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnMoreColumns = {
new ErrorMessageSubmitOnMoreColumns(
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
ColumnOrValue.withValue(errMessage),
errColNames,
ColumnOrValue.withOption(additionalInfo)
)
}
}
Copy link
Contributor

@TebaleloS TebaleloS Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new line after line 44

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's there. File has 45 lines. .editorconfig file is actually set to ensure it's always there.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package za.co.absa.spark.commons.errorhandling.implementations

import org.apache.spark.sql.functions.array
import org.apache.spark.sql.functions.typedLit
import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.implementations.ErrorMessageSubmitWithoutColumn.emptyErrorColsAndValues
import za.co.absa.spark.commons.errorhandling.types._

class ErrorMessageSubmitWithoutColumn(
Expand All @@ -27,16 +28,17 @@ class ErrorMessageSubmitWithoutColumn(
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmit {

val errCol: ColumnOrValue[ErrCol] = ColumnOrValue.asEmpty
val rawValues: ColumnOrValue[RawValues] = ColumnOrValue(array())
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(typedLit(emptyErrorColsAndValues))
}

object ErrorMessageSubmitWithoutColumn {
private val emptyErrorColsAndValues: ErrColsAndValues = Map.empty

def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, additionalInfo: AdditionalInfo = None): ErrorMessageSubmitWithoutColumn = {
new ErrorMessageSubmitWithoutColumn(
ColumnOrValue.withActualValue(errType),
ColumnOrValue.withActualValue(errCode),
ColumnOrValue.withActualValue(errMessage),
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
ColumnOrValue.withValue(errMessage),
ColumnOrValue.withOption(additionalInfo)
)
}
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a newline at the end (after line 58)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above 😉

Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ trait ErrorHandlingCommon extends ErrorHandling {
}

def putErrorsWithGrouping(dataFrame: DataFrame)(errorsWhen: Seq[ErrorWhen]): DataFrame = {
val errorsByColumn = errorsWhen.groupBy(_.errorMessageSubmit.errCol.getValue)
val errorColumns1 = errorsByColumn.getOrElse(None, Seq.empty).map(errorWhenToCol) // no grouping without ErrCol name
val errorColumns2 = (errorsByColumn - None).values.map(errorWhenSeqToCol).toSeq
register(dataFrame.sparkSession)
val errorsByColumn = errorsWhen.groupBy(_.errorMessageSubmit.errColsAndValues.columnNames)
val noColNames = Set.empty[String]
val errorColumns1 = errorsByColumn.getOrElse(noColNames, Seq.empty).map(errorWhenToCol) // no grouping without ErrCol names
val errorColumns2 = (errorsByColumn - noColNames).values.map(errorWhenSeqToCol).toSeq
doTheAggregation(dataFrame, errorColumns1 ++ errorColumns2: _*)
}

Expand Down
Loading