Skip to content

Commit

Permalink
* Added UTs for ColumnOrValue
Browse files Browse the repository at this point in the history
* Fixed few minor things discovered by the UTs
  • Loading branch information
benedeki committed Mar 20, 2023
1 parent 8e747e7 commit 64dc95e
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package za.co.absa.spark.commons.errorhandling.implementations.submits

import za.co.absa.spark.commons.errorhandling.ErrorMessageSubmit
import za.co.absa.spark.commons.errorhandling.types.ColumnOrValue.columnNameToItsStringValue
import za.co.absa.spark.commons.errorhandling.types._

class ErrorMessageSubmitOnMoreColumns(
Expand All @@ -27,8 +26,7 @@ class ErrorMessageSubmitOnMoreColumns(
errColNames: Set[ErrSourceColName],
override val additionInfo: ColumnOrValue[AdditionalInfo] = ColumnOrValue.asEmpty
) extends ErrorMessageSubmit {
val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue(errColNames, columnNameToItsStringValue)

val errColsAndValues: ColumnOrValue[ErrColsAndValues] = ColumnOrValue.asMapOfStringColumns(errColNames)
}

object ErrorMessageSubmitOnMoreColumns {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ object ColumnOrValue {

def apply[T](columnName: String): ColumnOrValue[T] = CoVNamedColumn(columnName)
def apply[T](column: Column): ColumnOrValue[T] = CoVDefinedColumn(column)
def apply[T](columnNames: Set[String], columnTransformer: ColumnTransformer): ColumnOrValue[Map[String, T]] = CoVMapColumn(columnNames, columnTransformer) //should it be explicit function?
def apply[T](mapColumnNames: Set[String], columnTransformer: ColumnTransformer): ColumnOrValue[Map[String, T]] = CoVMapColumn(mapColumnNames, columnTransformer) //should it be explicit function?

def withOption(value: Option[String]): ColumnOrValue[Option[String]] = { // could be safely an apply, or done more generally
value match {
case None => CoVNull(StringType)
case _ => CoVValue(value)
case Some(x) => CoVOption(x)
}
}
def withValue[T](value: T): ColumnOrValue[T] = CoVValue(value)

def asEmpty: ColumnOrValue[Option[String]] = CoVNull(StringType)
def asMapOfStringColumns(mapColumnNames: Set[String]): ColumnOrValue[Map[String, String]] = CoVMapColumn(mapColumnNames, columnNameToItsStringValue)

def columnNameToItsStringValue(colName: String): Column = {
col(colName).cast(StringType)
}
def columnNameToItsStringValue(colName: String): Column = col(colName).cast(StringType)

private final case class CoVNamedColumn[T](columnName: String) extends ColumnOrValue[T] {
val column: Column = col(columnName)
Expand Down Expand Up @@ -80,13 +80,22 @@ object ColumnOrValue {
val isValue: Boolean = false
val getValue: Option[Map[String, T]] = None
val column: Column = {
val (mapKeys, mapValues) = columnNames.foldLeft(Seq.empty[Column], Seq.empty[Column]) {case ((accKeys, accVals), colName) =>
val (mapKeys, mapValues) = columnNames.foldRight(Seq.empty[Column], Seq.empty[Column]) {case (colName, (accKeys, accVals)) =>
(typedLit(colName) +: accKeys , columnTransformer(colName) +: accVals)
}
map_from_arrays(array(mapKeys: _*), array(mapValues: _*))
}
}

private final case class CoVOption[T](value: T) extends ColumnOrValue[Option[T]] {
val column: Column = lit(value)

val isColumn: Boolean = false
val isValue: Boolean = true
val columnNames: Set[String] = Set.empty
val getValue: Option[Option[T]] = Some(Some(value))
}

private final case class CoVNull[T](dataType: DataType) extends ColumnOrValue[T] {
val column: Column = null_col(dataType)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.errorhandling.types

import org.apache.spark.sql.Column
import org.scalatest.Assertions

case class ColumnOrValueForm[T] (
column: Column,
isColumn: Boolean,
isValue: Boolean,
columnNames: Set[String],
value: Option[T]
) extends Assertions {
def assertTo(columnOrValue: ColumnOrValue[T]): Unit ={
assert(column == columnOrValue.column)
assert(isColumn == columnOrValue.isColumn)
assert(isValue == columnOrValue.isValue)
assert(columnNames == columnOrValue.columnNames)
assert(value == columnOrValue.getValue)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2023 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.errorhandling.types

import net.bytebuddy.dynamic.scaffold.MethodGraph.Empty
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{array, col, current_date, lit, map_from_arrays}
import org.apache.spark.sql.types.StringType
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.spark.commons.sql.functions.null_col

class ColumnOrValueTest extends AnyFunSuite {
test("Creation of column based on its name"){
val colName = "my_column"
val expected = ColumnOrValueForm(col(colName), isColumn = true, isValue = false, Set(colName), None)
val result = ColumnOrValue(colName)
expected assertTo result
}

test("Creation of column based on its definition") {
val myColumn = current_date
val expected = ColumnOrValueForm(myColumn, isColumn = true, isValue = false, Set(), None)
val result = ColumnOrValue(myColumn)
expected assertTo result
}

test("Creation of map from given column names") {
val colNames = Set("Col1", "Col2", "Col3")
val colTransformer: String => Column = col
val expectedColumn = map_from_arrays(
array(
lit("Col1"), lit("Col2"), lit("Col3")
), array(
col("Col1"), col("Col2"), col("Col3")
))
val expected = ColumnOrValueForm[Map[String, Any]](expectedColumn, isColumn = true, isValue = false, colNames, None)
val result = ColumnOrValue[Any](colNames, colTransformer)
expected assertTo(result)
}

test("Creating ColumnOrValue from a defined Option") {
val value = "Foo"
val expected = ColumnOrValueForm(lit(value), isColumn = false, isValue = true, Set(), Option(Option(value)))
val result = ColumnOrValue.withOption(Option(value))
expected assertTo result
}

test("Creating ColumnOrValue from an empty Option") {
val expected = ColumnOrValueForm[Option[String]](null_col(StringType), isColumn = false, isValue = true, Set(), None)
val result = ColumnOrValue.withOption(None)
expected assertTo result
}

test("Creating ColumnOrValue from a given value") {
val value = 42
val expected = ColumnOrValueForm(lit(value), isColumn = false, isValue = true, Set(), Some(value))
val result = ColumnOrValue.withValue(value)
expected assertTo result
}

test("Creating ColumnOrValue as an undefined (empty) value") {

val myColumn = null_col(StringType)
val expected = ColumnOrValueForm[Option[String]](myColumn, isColumn = false, isValue = true, Set(), None)
val result = ColumnOrValue.asEmpty
expected assertTo result
}

test("Creating ColumnOrValue as a map of string columns") {
val colNames = Set("Col1", "Col2", "Col3")
val expectedColumn = map_from_arrays(
array(
lit("Col1"), lit("Col2"), lit("Col3")
), array(
col("Col1").cast(StringType), col("Col2").cast(StringType), col("Col3").cast(StringType)
))
val expected = ColumnOrValueForm[Map[String, String]](expectedColumn, isColumn = true, isValue = false, colNames, None)
val result = ColumnOrValue.asMapOfStringColumns(colNames)
expected assertTo(result)
}


}

0 comments on commit 64dc95e

Please sign in to comment.