Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing Map for error columns and their values #85

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
* limitations under the License.
*/

package za.co.absa.spark.commons.sql
package za.co.absa.spark.commons.adapters

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.callUDF

import scala.util.{Success, Try}

// scalastyle:off
object functions2 {
// scalastyle:on

def call_udf(udfName: String, cols: Column*): Column = call_udf(udfName, cols:_*)

trait CallUdfAdapter {
def call_udf(udfName: String, cols: Column*): Column = callUDF(udfName, cols:_*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
* limitations under the License.
*/

package za.co.absa.spark.commons.sql
package za.co.absa.spark.commons.adapters

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.callUDF

import scala.util.{Success, Try}

// scalastyle:off
object functions2 {
// scalastyle:on

def call_udf(udfName: String, cols: Column*): Column = call_udf(udfName, cols:_*)
import org.apache.spark.sql.functions.{call_udf => sparkCallUdf}

trait CallUdfAdapter {
def call_udf(udfName: String, cols: Column*): Column = sparkCallUdf(udfName, cols:_*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
* limitations under the License.
*/

package za.co.absa.spark.commons.sql
package za.co.absa.spark.commons.adapters

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.callUDF

import scala.util.{Success, Try}

// scalastyle:off
object functions2 {
// scalastyle:on

def call_udf(udfName: String, cols: Column*): Column = call_udf(udfName, cols:_*)
import org.apache.spark.sql.functions.{call_udf => sparkCallUdf}

trait CallUdfAdapter {
def call_udf(udfName: String, cols: Column*): Column = sparkCallUdf(udfName, cols:_*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
package za.co.absa.spark.commons.errorhandling

import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import za.co.absa.spark.commons.errorhandling.implementations.{ErrorMessageSubmitOnColumn, ErrorMessageSubmitWithoutColumn}
import za.co.absa.spark.commons.errorhandling.implementations.submits.{ErrorMessageSubmitOnColumn, ErrorMessageSubmitWithoutColumn}
import za.co.absa.spark.commons.errorhandling.types._

trait ErrorHandling {
def register(sparkToRegisterTo: SparkSession): Unit = {}

def putErrorToColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errCol: ErrCol, additionalInfo: AdditionalInfo = None): ErrorColumn = {
def putErrorToColumn(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errCol: Option[ErrSourceColName], additionalInfo: AdditionalInfo = None): ErrorColumn = {
val toSubmit = errCol
.map(errSourceColName => ErrorMessageSubmitOnColumn(errType, errCode, errMessage, errSourceColName, additionalInfo))
.getOrElse(ErrorMessageSubmitWithoutColumn(errType, errCode, errMessage, additionalInfo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package za.co.absa.spark.commons.errorhandling

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import za.co.absa.spark.commons.errorhandling.ErrorMessage.Mapping
import za.co.absa.spark.commons.errorhandling.types._

/**
Expand All @@ -27,30 +24,14 @@ import za.co.absa.spark.commons.errorhandling.types._
* @param errType - Type or source of the error
* @param errCode - Internal error code
* @param errMsg - Textual description of the error
* @param errCol - The name of the column where the error occurred
* @param rawValues - Sequence of raw values (which are the potential culprits of the error)
* @param additionInfo - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column
* @param errColsAndValues - The names of the columns where the error occurred and their raw values (which are the
* potential culprits of the error)
* @param additionInfo - any optional additional information in the form of a JSON string
*/
case class ErrorMessage(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking if it wouldn't be better to use the types directly instead of type aliases. These type aliases might add some overhead for the users. I'm a bit stupid so I would be constantly forgetting what type ErrType really is 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason why I used type aliases was that I wasn't sure we are settled on the types. This way it is rather easy to change.

Also the AdditionalInfo actually is planned to be more then alias eventually (#90)

I am happy to discuss the other aliases once the thing is settled (aliases should be easy to remove) 👍

errType: ErrType,
errCode: ErrCode,
errMsg: ErrMsg,
errCol: ErrCol,
rawValues: RawValues,
errColsAndValues: ErrColsAndValues,
additionInfo: AdditionalInfo = None
)

object ErrorMessage {
//TODO probably not needed
case class Mapping(
mappingTableColumn: String,
mappedDatasetColumn: String
)

val errorColumnName = "errCol"
def errorColSchema(implicit spark: SparkSession): StructType = {
import spark.implicits._
spark.emptyDataset[ErrorMessage].schema
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ trait ErrorMessageSubmit {
def errType: ColumnOrValue[ErrType]
def errCode: ColumnOrValue[ErrCode]
def errMsg: ColumnOrValue[ErrMsg]
def errCol: ColumnOrValue[ErrCol]
def rawValues: ColumnOrValue[RawValues]
def errColsAndValues: ColumnOrValue[ErrColsAndValues]
def additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,54 @@

package za.co.absa.spark.commons.errorhandling.implementations

import org.apache.spark.sql.functions.{array, array_except, array_union, col}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{array, array_except, array_union, col, map_from_arrays, map_keys, map_values, struct, when}
import za.co.absa.spark.commons.adapters.TransformAdapter
import za.co.absa.spark.commons.errorhandling.partials.EvaluateIntoErrorMessage.FieldNames._
import za.co.absa.spark.commons.errorhandling.partials.{ErrorHandlingCommon, EvaluateIntoErrorMessage}
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.spark.commons.sql.functions.null_col
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements

case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.defaultErrorColumnName)
extends ErrorHandlingCommon
with EvaluateIntoErrorMessage {

override protected def doTheAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = {
def appendToArray(dataFrame: DataFrame, colName: String, colToUnion: Column): DataFrame = {
dataFrame.withColumn(colName, array_union(col(colName), colToUnion))
}
val aggregatedWithouNulls = array_except(array(errCols: _*), array(null_col))
dataFrame.withColumnIfDoesNotExist(appendToArray(_, _, aggregatedWithouNulls))(errorColumnName, aggregatedWithouNulls)
with EvaluateIntoErrorMessage
with TransformAdapter {

private def decomposeMap(errorMessageColumn: Column): Column = {
when(errorMessageColumn.isNotNull,
struct(
errorMessageColumn.getField(errType) as errType,
errorMessageColumn.getField(errCode) as errCode,
errorMessageColumn.getField(errMsg) as errMsg,
map_keys(errorMessageColumn.getField(errColsAndValues)) as errCols,
map_values(errorMessageColumn.getField(errColsAndValues)) as errValues,
errorMessageColumn.getField(additionInfo) as additionInfo
)
)
}

private def recomposeMap(errorMessageColumn: Column): Column = {
struct(
errorMessageColumn.getField(errType) as errType,
errorMessageColumn.getField(errCode) as errCode,
errorMessageColumn.getField(errMsg) as errMsg,
map_from_arrays(errorMessageColumn.getField(errCols), errorMessageColumn.getField(errValues)) as errColsAndValues ,
errorMessageColumn.getField(additionInfo) as additionInfo
)
}

private def deMap(arrayCol: Column): Column = transform(arrayCol, decomposeMap)
private def reMap(arrayCol: Column): Column = transform(arrayCol, recomposeMap)

private def appendToErrCol(dataFrame: DataFrame, errorColName: String, colToUnion: Column): DataFrame = {
dataFrame.withColumn(errorColName, reMap(array_union(deMap(col(errorColName)), colToUnion)))
}

protected def doTheColumnsAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = {
val aggregated = array(errCols.map(decomposeMap): _*) //need to decompose the map field, as it's not supported in array functions
val aggregatedWithoutNulls = array_except(aggregated, array(null_col))
val joinToExisting: (DataFrame, String) => DataFrame = appendToErrCol(_, _, aggregatedWithoutNulls)
dataFrame.withColumnIfDoesNotExist(joinToExisting)(errorColumnName, reMap(aggregatedWithoutNulls))
}

}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.errorhandling.implementations.submits

import org.apache.spark.sql.functions.{lit, map}
import org.apache.spark.sql.types.StringType
import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.implementations.submits.ErrorMessageSubmitJustErrorValue.noColumnKey
import za.co.absa.spark.commons.errorhandling.types._

class ErrorMessageSubmitJustErrorValue(
val errType: ColumnOrValue[ErrType],
val errCode: ColumnOrValue[ErrCode],
val errMsg: ColumnOrValue[ErrMsg],
errValue: ColumnOrValue[String],
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmit {
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(map(lit(noColumnKey), errValue.column.cast(StringType)))
}

object ErrorMessageSubmitJustErrorValue {
val noColumnKey: ErrSourceColName = ""

def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: String, additionalInfo: AdditionalInfo = None): ErrorMessageSubmitJustErrorValue = {
new ErrorMessageSubmitJustErrorValue(
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
ColumnOrValue.withValue(errMessage),
ColumnOrValue.withValue(errValue),
ColumnOrValue.withOption(additionalInfo)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.errorhandling.implementations.submits

import za.co.absa.spark.commons.errorhandling.types._

class ErrorMessageSubmitOnColumn (
errType: ColumnOrValue[ErrType],
errCode: ColumnOrValue[ErrCode],
errMsg: ColumnOrValue[ErrMsg],
errColName: ErrSourceColName,
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmitOnMoreColumns(errType, errCode, errMsg, Set(errColName), additionInfo) {
}

object ErrorMessageSubmitOnColumn {

def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errColName: ErrSourceColName, additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnColumn = {
new ErrorMessageSubmitOnColumn(
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
ColumnOrValue.withValue(errMessage),
errColName,
ColumnOrValue.withOption(additionalInfo)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.errorhandling.implementations.submits

import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.types.ColumnOrValue.columnNameToItsStringValue
import za.co.absa.spark.commons.errorhandling.types._

class ErrorMessageSubmitOnMoreColumns(
val errType: ColumnOrValue[ErrType],
val errCode: ColumnOrValue[ErrCode],
val errMsg: ColumnOrValue[ErrMsg],
errColNames: Set[ErrSourceColName],
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmit {
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(errColNames, columnNameToItsStringValue)

}

object ErrorMessageSubmitOnMoreColumns {
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errColNames: Set[ErrSourceColName], additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnMoreColumns = {
new ErrorMessageSubmitOnMoreColumns(
ColumnOrValue.withValue(errType),
ColumnOrValue.withValue(errCode),
ColumnOrValue.withValue(errMessage),
errColNames,
ColumnOrValue.withOption(additionalInfo)
)
}
}
Loading