From 38ad8a6ca2a150f8e15d87113d3e214a8aecb2e3 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 6 Apr 2023 16:30:31 +0200 Subject: [PATCH] Fixes #93 - added ErrorHandlingFilterRowsWithErrors test file --- .../ErrorHandlingFilterRowsWithErrors.scala | 18 +++++++++--------- ...ErrorHandlingFilterRowsWithErrorsTest.scala | 6 ++++++ 2 files changed, 15 insertions(+), 9 deletions(-) create mode 100644 spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrorsTest.scala diff --git a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrors.scala b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrors.scala index cdbee9bb..6a9fbce7 100644 --- a/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrors.scala +++ b/spark-commons/src/main/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrors.scala @@ -1,8 +1,8 @@ package za.co.absa.spark.commons.errorhandling.implementations import org.apache.spark.sql.{Column, DataFrame} -import org.apache.spark.sql.functions.{coalesce, col, collect_list} -import za.co.absa.spark.commons.errorhandling.{ErrorMessageSubmit} +import org.apache.spark.sql.functions.{coalesce, col, lit} +import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit import za.co.absa.spark.commons.errorhandling.partials.ErrorHandlingCommon import za.co.absa.spark.commons.errorhandling.types.ErrorColumn @@ -23,23 +23,23 @@ object ErrorHandlingFilterRowsWithErrors extends ErrorHandlingCommon { } /** - * Evaluate the given column to check if it has errors - * @param errorMessageSubmit the object that has to be evaluated for error purposes - * @return returns the columns with error + * Checks if given column has errors or not + * @param errorMessageSubmit the object that defines the structure of the column + * @return returns true if the column contains an error */ override protected def evaluate(errorMessageSubmit: ErrorMessageSubmit): Column = { - errorMessageSubmit.errMsg.column + lit(true) } /** - * Checks for relationship of the provided clumn in the given dataframe. + * Checks for relationship of the provided column in the given dataframe. * @param dataFrame the overall data structure that need to be aggregated - * @param errCols the row to aggregate the dataframe with + * @param errCols the columns to aggregate the dataframe with * @return Returns the aggregated dataset with errors. */ override protected def doTheAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = { val aggregatedDF = dataFrame.groupBy("errCode") - .agg(coalesce(collect_list("errCols")) as "AggregatedError") + .agg(coalesce(errCols: _*, lit(false)) as "AggregatedError") aggregatedDF.filter(!col("AggregatedError")) } diff --git a/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrorsTest.scala b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrorsTest.scala new file mode 100644 index 00000000..2c580cdf --- /dev/null +++ b/spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandling/implementations/ErrorHandlingFilterRowsWithErrorsTest.scala @@ -0,0 +1,6 @@ +package za.co.absa.spark.commons.errorhandling.implementations +import org.scalatest.funsuite.AnyFunSuite + +class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite { + +}