Skip to content

Commit

Permalink
#170 Add more unit tests for the renaming transformer.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Oct 22, 2020
1 parent 69dca89 commit f2bf19f
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ object ColumnRenamingStreamTransformer extends StreamTransformerFactory with Col
override def apply(config: Configuration): StreamTransformer = {
val columnsFrom = config.getStringArray(KEY_COLUMNS_FROM)
val columnsTo = config.getStringArray(KEY_COLUMNS_TO)
LogManager.getLogger.info(s"Going to create ColumnRenamingStreamTransformer using: columnsFrom='$columnsFrom', columnsTo='$columnsTo'")
LogManager.getLogger.info(s"Going to create ColumnRenamingStreamTransformer using: " +
s"columnsFrom='${columnsFrom.mkString(",")}', columnsTo='${columnsTo.mkString(",")}'")
new ColumnRenamingStreamTransformer(columnsFrom, columnsTo)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import org.apache.commons.configuration2.BaseConfiguration
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.commons.configuration2.{BaseConfiguration, DynamicCombinedConfiguration}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
Expand Down Expand Up @@ -48,7 +50,7 @@ class TestColumnRenamingStreamTransformer extends FlatSpec with SparkTestBase wi
baseDir.delete()
}

it should "rename input column" in {
it should "rename an input column" in {
val config = new BaseConfiguration()
config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value")
Expand All @@ -66,6 +68,45 @@ class TestColumnRenamingStreamTransformer extends FlatSpec with SparkTestBase wi
assert(!actualDf.schema.exists(f => f.name == "value"))
}

it should "rename multiple columns while leaving existing columns intact" in {
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))

config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value, value2")
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v1, v2")
val underTest = ColumnRenamingStreamTransformer(config)
val df = getDummyReadStream().toDF()
.withColumn("value2", col("value"))
.withColumn("value3", col("value"))

executeQuery(underTest.transform(df))

val actualDf = spark.read.parquet(destinationDir)

actualDf.printSchema()
assert(actualDf.schema.exists(f => f.name == "v1"))
assert(actualDf.schema.exists(f => f.name == "v2"))
assert(actualDf.schema.exists(f => f.name == "value3"))

assert(!actualDf.schema.exists(f => f.name == "value"))
assert(!actualDf.schema.exists(f => f.name == "value2"))
}

it should "throw an exception if columns from do not match columns to" in {
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))

config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value, value2")
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v1")

val ex = intercept[IllegalArgumentException] {
ColumnRenamingStreamTransformer(config)
}

assert(ex.getMessage.contains("The size of source column names doesn't match"))
}

private def executeQuery(df: DataFrame): Unit = {
val query = df
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import org.apache.commons.configuration2.{BaseConfiguration, DynamicCombinedConfiguration}
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.scalatest.FlatSpec

import ColumnRenamingStreamTransformer._

class TestColumnRenamingStreamTransformerObject extends FlatSpec {
behavior of ColumnRenamingStreamTransformer.getClass.getSimpleName

it should "create ColumnRenamingStreamTransformer for column pairs specified in configurations" in {
val columnsFrom = Seq("a", "b", "c")
val columnsTo = Seq("A", "b2", "C3")
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))

val transformer = ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
assert(columnsFrom == transformer.columnsFrom)
assert(columnsTo == transformer.columnsTo)
}

it should "throw an exception if source columns are not specified" in {
val columnsTo = Seq("A", "b2", "C3")
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))

val ex = intercept[IllegalArgumentException] {
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
}

assert(ex.getMessage.contains("Empty list of columns to rename."))
}

it should "throw an exception if target columns are not specified" in {
val columnsFrom = Seq("a", "b", "c")
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))

val ex = intercept[IllegalArgumentException] {
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
}

assert(ex.getMessage.contains("Empty list of columns to rename."))
}

it should "throw an exception if two lists do not match" in {
val columnsFrom = Seq("a", "b", "c")
val columnsTo = Seq("A", "b2")
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))

val ex = intercept[IllegalArgumentException] {
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
}

assert(ex.getMessage.contains("The size of source column names doesn't match"))
}
}

0 comments on commit f2bf19f

Please sign in to comment.