Skip to content

Commit

Permalink
#93 - Unit test work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Apr 25, 2023
1 parent 6b016f5 commit 473a078
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,12 @@ import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{coalesce, lit}
import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.partials.ErrorHandlingCommon
import za.co.absa.spark.commons.errorhandling.types.ErrorColumn

/**
* Class implement the functionality of filtering rows with columns.
*/

object ErrorHandlingFilterRowsWithErrors extends ErrorHandlingCommon {
// /**
// * Method return the rows that has errors on aggregation
// * @param dataFrame the overall data structure that need to be checked for rows with errors
// * @param errCols a final case class that provide error columns
// * @return returns rows with errors
// */
// override def aggregateErrorColumns(dataFrame: DataFrame)(errCols: ErrorColumn*): DataFrame = {
// doTheColumnsAggregation(dataFrame, errCols.map(_.column): _*)
// }

/**
* Creates a column with the error description, in this particular case actually only signals with a boolean flag there was an error in the row.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/

package za.co.absa.spark.commons.errorhandling.implementations
import org.apache.hadoop.shaded.com.sun.jersey.spi.inject.Errors.ErrorMessage
//import org.apache.hadoop.shaded.com.sun.jersey.spi.inject.Errors.ErrorMessage
import org.apache.spark.sql.DataFrame
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.spark.commons.errorhandling.ErrorMessage
Expand All @@ -28,7 +28,7 @@ class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBa

private val col1Name = "Col1"
private val col2Name = "Col2"
private val data = Seq(
private val srcDf = Seq(
(None, ""),
(Some(1), "a"),
(Some(2), "bb"),
Expand All @@ -42,7 +42,7 @@ class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBa
}

test("aggregateErrorColumns\" should \"return a DataFrame with the specified columns aggregated\"") {
val expected: List[ResultDfRecordType] = List(
val expectedResults: List[ResultDfRecordType] = List(
(None, "", List(
ErrorMessage("Test error 1", 1, "This is a test error", Map("Col1" -> nullString)),
ErrorMessage("Test error 2", 2, "This is a test error", Map("Col2" -> "")),
Expand Down Expand Up @@ -71,12 +71,11 @@ class ErrorHandlingFilterRowsWithErrorsTest extends AnyFunSuite with SparkTestBa
val errorSubmitB = ErrorMessageSubmitWithoutColumn("Test error 3", 3, "This is a test error")
val e3 = ErrorHandlingFilterRowsWithErrors.putErrorToColumn(errorSubmitB)

val dfSchema = Seq("Col1","Col2").toList
val resultsDF = ErrorHandlingFilterRowsWithErrors.aggregateErrorColumns(srcDf)(e1, e2, e3)
resultsDF.show()
val results = resultDfToResult(resultsDF)

val results = ErrorHandlingFilterRowsWithErrors.aggregateErrorColumns(data)(e1, e2, e3)
results.printSchema()
results.show(false)
// assert(results.schema == dfSchema)
assert(results == expectedResults)
}

}

0 comments on commit 473a078

Please sign in to comment.