Skip to content

Commit

Permalink
#93 - Unit test work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Apr 28, 2023
1 parent 58ecd56 commit 786ea07
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ object ErrorHandlingFilterRowsWithErrors extends ErrorHandlingCommon {
*/
override protected def doTheColumnsAggregation(dataFrame: DataFrame, errCols: Column*): DataFrame = {
val columns: Seq[Column] = errCols :+ lit(false)
// dataFrame.filter(!coalesce(columns: _*).isNull)
val aggregatedDF = dataFrame
.withColumn("AggregatedError", coalesce(columns: _*))
aggregatedDF.filter(!col("AggregatedError")).drop("AggregatedError")
dataFrame.filter(!coalesce(columns: _*))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
*/

package za.co.absa.spark.commons.errorhandling.implementations
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.functions.{col, length}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, column, length}
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.spark.commons.test.SparkTestBase
import za.co.absa.spark.commons.errorhandling.implementations.submits.{ErrorMessageSubmitOnColumn, ErrorMessageSubmitWithoutColumn}
import za.co.absa.spark.commons.errorhandling.types.ErrorWhen

class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBase {
import spark.implicits._
Expand All @@ -37,7 +38,7 @@ class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBa
resultDf.as[ResultDfRecordType].collect().sortBy(_._1).toList
}

test("aggregateErrorColumns\" should \"return a DataFrame with the specified columns aggregated\"") {
test("aggregateErrorColumns\" should \"return an empty list after error aggregation\"") {
val expectedResults: List[ResultDfRecordType] = List()

val e1 = ErrorHandlingFilterRowsWithErrors.putErrorToColumn("Test error 1", 1, "This is a test error", Some(col1Name))
Expand All @@ -52,6 +53,28 @@ class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBa
val results = resultDfToResult(resultsDF)
println("Results: ", results)

assert(results.length == expectedResults.length)
assert(results == expectedResults)
}

test("aggregateErrorColumns\" should \"return records whose don't have errors\"") {
val expectedResults: List[ResultDfRecordType] = List(
(Some(1),"a"),
(Some(2),"bb")
)

val er1 = ErrorWhen(col(col1Name).isNull, ErrorMessageSubmitWithoutColumn("Test error 3", 0, "This is a test error"))
val er2 = ErrorWhen(col(col1Name) > 2, ErrorMessageSubmitOnColumn("ValueTooBig", 1, "The value of the column is too big", col1Name))
val er3 = ErrorWhen(length(col(col2Name)) > 2, ErrorMessageSubmitOnColumn("String too long", 5, "The text in the field is too long", col2Name))

// The putErrorsWithGrouping calls the doAggregationErrorColumns method implemented in ErrorHandlingFilterRowsWithErrors object
val resultsDf = ErrorHandlingFilterRowsWithErrors.putErrorsWithGrouping(srcDf)(
Seq(er1, er2, er3)
)
resultsDf.show()

val results = resultDfToResult(resultsDf)

assert(results == expectedResults)
}

Expand Down

0 comments on commit 786ea07

Please sign in to comment.