-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing Map for error columns and their values #85
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,22 +16,54 @@ | |
|
||
package za.co.absa.spark.commons.errorhandling.implementations | ||
|
||
import org.apache.spark.sql.functions.{array, array_except, array_union, col} | ||
import org.apache.spark.sql.{Column, DataFrame} | ||
import org.apache.spark.sql.functions.{array, array_except, array_union, col, map_from_arrays, map_keys, map_values, struct, when} | ||
import za.co.absa.spark.commons.adapters.TransformAdapter | ||
import za.co.absa.spark.commons.errorhandling.partials.EvaluateIntoErrorMessage.FieldNames._ | ||
import za.co.absa.spark.commons.errorhandling.partials.{ErrorHandlingCommon, EvaluateIntoErrorMessage} | ||
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements | ||
import za.co.absa.spark.commons.sql.functions.null_col | ||
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements | ||
|
||
case class ErrorMessageArray(errorColumnName: String = ErrorMessageArray.defaultErrorColumnName) | ||
extends ErrorHandlingCommon | ||
with EvaluateIntoErrorMessage { | ||
|
||
override protected def doTheAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = { | ||
def appendToArray(dataFrame: DataFrame, colName: String, colToUnion: Column): DataFrame = { | ||
dataFrame.withColumn(colName, array_union(col(colName), colToUnion)) | ||
} | ||
val aggregatedWithouNulls = array_except(array(errCols: _*), array(null_col)) | ||
dataFrame.withColumnIfDoesNotExist(appendToArray(_, _, aggregatedWithouNulls))(errorColumnName, aggregatedWithouNulls) | ||
with EvaluateIntoErrorMessage | ||
with TransformAdapter { | ||
|
||
private def decomposeMap(errorMessageColumn: Column): Column = { | ||
when(errorMessageColumn.isNotNull, | ||
struct( | ||
errorMessageColumn.getField(errType) as errType, | ||
errorMessageColumn.getField(errCode) as errCode, | ||
errorMessageColumn.getField(errMsg) as errMsg, | ||
map_keys(errorMessageColumn.getField(errColsAndValues)) as errCols, | ||
map_values(errorMessageColumn.getField(errColsAndValues)) as errValues, | ||
errorMessageColumn.getField(additionInfo) as additionInfo | ||
) | ||
) | ||
} | ||
|
||
private def recomposeMap(errorMessageColumn: Column): Column = { | ||
struct( | ||
errorMessageColumn.getField(errType) as errType, | ||
errorMessageColumn.getField(errCode) as errCode, | ||
errorMessageColumn.getField(errMsg) as errMsg, | ||
map_from_arrays(errorMessageColumn.getField(errCols), errorMessageColumn.getField(errValues)) as errColsAndValues , | ||
errorMessageColumn.getField(additionInfo) as additionInfo | ||
) | ||
} | ||
|
||
private def deMap(arrayCol: Column): Column = transform(arrayCol, decomposeMap) | ||
private def reMap(arrayCol: Column): Column = transform(arrayCol, recomposeMap) | ||
|
||
private def appendToErrCol(dataFrame: DataFrame, errorColName: String, colToUnion: Column): DataFrame = { | ||
dataFrame.withColumn(errorColName, reMap(array_union(deMap(col(errorColName)), colToUnion))) | ||
} | ||
|
||
protected def doTheAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking if we should use the word "aggregation" here and also in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Actually you inspired me, as it's a little an internal function it can be little more eloquent. Hopefully it will improve the understanding. 😉 |
||
val aggregated = array(errCols.map(decomposeMap): _*) //need to decompose the map field, as it's not supported in array functions | ||
val aggregatedWithoutNulls = array_except(aggregated, array(null_col)) | ||
val joinToExisting: (DataFrame, String) => DataFrame = appendToErrCol(_, _, aggregatedWithoutNulls) | ||
dataFrame.withColumnIfDoesNotExist(joinToExisting)(errorColumnName, reMap(aggregatedWithoutNulls)) | ||
} | ||
|
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.spark.commons.errorhandling.implementations | ||
|
||
import org.apache.spark.sql.functions.{lit, map} | ||
import org.apache.spark.sql.types.StringType | ||
import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit | ||
import za.co.absa.spark.commons.errorhandling.implementations.ErrorMessageSubmitJustErrorValue.noColumnKey | ||
import za.co.absa.spark.commons.errorhandling.types._ | ||
|
||
case class ErrorMessageSubmitJustErrorValue( | ||
errType: ColumnOrValue[ErrType], | ||
errCode: ColumnOrValue[ErrCode], | ||
errMsg: ColumnOrValue[ErrMsg], | ||
errValue: ColumnOrValue[String], | ||
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty | ||
) extends ErrorMessageSubmit { | ||
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(map(lit(noColumnKey), errValue.column.cast(StringType))) | ||
} | ||
|
||
object ErrorMessageSubmitJustErrorValue { | ||
val noColumnKey: ErrSourceColName = "" | ||
|
||
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: String): ErrorMessageSubmitJustErrorValue = { | ||
new ErrorMessageSubmitJustErrorValue( | ||
ColumnOrValue.withValue(errType), | ||
ColumnOrValue.withValue(errCode), | ||
ColumnOrValue.withValue(errMessage), | ||
ColumnOrValue.withValue(errValue), | ||
ColumnOrValue.asEmpty | ||
) | ||
} | ||
|
||
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errValue: String, additionalInfo: AdditionalInfo): ErrorMessageSubmitJustErrorValue = { | ||
new ErrorMessageSubmitJustErrorValue( | ||
ColumnOrValue.withValue(errType), | ||
ColumnOrValue.withValue(errCode), | ||
ColumnOrValue.withValue(errMessage), | ||
ColumnOrValue.withValue(errValue), | ||
ColumnOrValue.withOption(additionalInfo) | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,33 +16,24 @@ | |
|
||
package za.co.absa.spark.commons.errorhandling.implementations | ||
|
||
import org.apache.spark.sql.Column | ||
import org.apache.spark.sql.functions.{array, col} | ||
import org.apache.spark.sql.types.StringType | ||
import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit | ||
import za.co.absa.spark.commons.errorhandling.types._ | ||
|
||
class ErrorMessageSubmitOnColumn ( | ||
val errType: ColumnOrValue[ErrType], | ||
val errCode: ColumnOrValue[ErrCode], | ||
val errMsg: ColumnOrValue[ErrMsg], | ||
errSourceColName: ErrSourceColName, | ||
errType: ColumnOrValue[ErrType], | ||
errCode: ColumnOrValue[ErrCode], | ||
errMsg: ColumnOrValue[ErrMsg], | ||
errColName: ErrSourceColName, | ||
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty | ||
) extends ErrorMessageSubmit { | ||
val errCol: ColumnOrValue[ErrCol] = ColumnOrValue.withOption(Option(errSourceColName)) | ||
override def rawValues: ColumnOrValue[RawValues] = { | ||
val colExpr: Column = array(col(errSourceColName).cast(StringType)) | ||
ColumnOrValue(colExpr) | ||
} | ||
) extends ErrorMessageSubmitOnMoreColumns(errType, errCode, errMsg, Set(errColName), additionInfo) { | ||
} | ||
|
||
object ErrorMessageSubmitOnColumn { | ||
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errSourceColName: ErrSourceColName, additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnColumn = { | ||
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errColName: ErrSourceColName, additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnColumn = { | ||
new ErrorMessageSubmitOnColumn( | ||
ColumnOrValue.withActualValue(errType), | ||
ColumnOrValue.withActualValue(errCode), | ||
ColumnOrValue.withActualValue(errMessage), | ||
errSourceColName, | ||
ColumnOrValue.withValue(errType), | ||
ColumnOrValue.withValue(errCode), | ||
ColumnOrValue.withValue(errMessage), | ||
errColName, | ||
ColumnOrValue.withOption(additionalInfo) | ||
) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a new line after line 39 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can do that, if it improves reading. But to be honest I probably forget more often 😊 |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.spark.commons.errorhandling.implementations | ||
|
||
import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit | ||
import za.co.absa.spark.commons.errorhandling.types.ColumnOrValue.columnNameToItsStringValue | ||
import za.co.absa.spark.commons.errorhandling.types._ | ||
|
||
class ErrorMessageSubmitOnMoreColumns( | ||
val errType: ColumnOrValue[ErrType], | ||
val errCode: ColumnOrValue[ErrCode], | ||
val errMsg: ColumnOrValue[ErrMsg], | ||
errColNames: Set[ErrSourceColName], | ||
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty | ||
) extends ErrorMessageSubmit { | ||
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(errColNames, columnNameToItsStringValue) | ||
|
||
} | ||
|
||
object ErrorMessageSubmitOnMoreColumns { | ||
def apply(errType: ErrType, errCode: ErrCode, errMessage: ErrMsg, errColNames: Set[ErrSourceColName], additionalInfo: AdditionalInfo= None): ErrorMessageSubmitOnMoreColumns = { | ||
new ErrorMessageSubmitOnMoreColumns( | ||
ColumnOrValue.withValue(errType), | ||
ColumnOrValue.withValue(errCode), | ||
ColumnOrValue.withValue(errMessage), | ||
errColNames, | ||
ColumnOrValue.withOption(additionalInfo) | ||
) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a new line after line 44 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's there. File has 45 lines. |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a newline at the end (after line 58) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above 😉 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking if it wouldn't be better to use the types directly instead of type aliases. These type aliases might add some overhead for the users. I'm a bit stupid so I would be constantly forgetting what type
ErrType
really is 😄There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reason why I used type aliases was that I wasn't sure we are settled on the types. This way it is rather easy to change.
Also the
AdditionalInfo
actually is planned to be more then alias eventually (#90)I am happy to discuss the other aliases once the thing is settled (aliases should be easy to remove) 👍