Skip to content

Commit

Permalink
#170 Add column renaming transformer (#173)
Browse files Browse the repository at this point in the history
* Add column renaming transformer.
* Extend e2e the test suite with the renaming transformer.
* Add renaming transformer usage description to README.md.
  • Loading branch information
yruslan authored Oct 30, 2020
1 parent 726b1a1 commit 02b6717
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 5 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ Caution: This transformer requires a writer which defines `writer.parquet.destin
| :--- | :---: | :--- |
| `transformer.{transformer-id}.report.date` | No | User-defined date for `hyperdrive_date` in format `yyyy-MM-dd`. Default date is the date of the ingestion |

##### ColumnRenamingStreamTransformer
`ColumnRenamingStreamTransformer` allows renaming of columns specified in the configuration.

To add the transformer to the pipeline use this class name:
```
component.transformer.class.{transformer-id} = za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer
```

| Property Name | Required | Description |
| :--- | :---: | :--- |
| `transformer.{transformer-id}.columns.rename.from` | Yes | A comma-separated list of columns to rename. For example, `column1, column2`. |
| `transformer.{transformer-id}.columns.rename.to` | Yes | A comma-separated list of new column names. For example, `column1_new, column2_new`. |

See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}`.
##### ParquetStreamWriter
| Property Name | Required | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
"component.transformer.class.[avro.decoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer",
"component.transformer.id.1" -> "[version.incrementer]",
"component.transformer.class.[version.incrementer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer",
"component.transformer.id.2" -> "[column.renamer]",
"component.transformer.class.[column.renamer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer",
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter",

// Spark settings
Expand All @@ -82,6 +84,10 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
// comma separated list of columns to select
"transformer.[version.incrementer].report.date" -> "2020-03-31",

// Rename columns transformer
"transformer.[column.renamer].columns.rename.from" -> "field1,field2",
"transformer.[column.renamer].columns.rename.to" -> "field3,field4",

// Sink(Parquet) settings
"writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"),
"writer.parquet.destination.directory" -> destinationDir,
Expand All @@ -102,7 +108,7 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
val df = spark.read.parquet(destinationDir)
df.count shouldBe ingestionSize
import spark.implicits._
df.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version")
df.columns should contain theSameElementsAs List("field3", "field4", "hyperdrive_date", "hyperdrive_version")
df.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1)

// when (2)
Expand All @@ -115,10 +121,10 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
val df2 = spark.read.parquet(destinationDir)
df2.count shouldBe 2 * ingestionSize
import spark.implicits._
df2.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version")
df2.columns should contain theSameElementsAs List("field3", "field4", "hyperdrive_date", "hyperdrive_version")
df2.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1, 2)
df2.select("field1").distinct().as[String].collect() should contain theSameElementsAs List("hello")
df2.select("field2").as[Int].collect() should contain theSameElementsAs (0 until ingestionSize) ++ (0 until ingestionSize)
df2.select("field3").distinct().as[String].collect() should contain theSameElementsAs List("hello")
df2.select("field4").as[Int].collect() should contain theSameElementsAs (0 until ingestionSize) ++ (0 until ingestionSize)
df2.select("hyperdrive_date").distinct()
.as[java.sql.Date].collect() should contain theSameElementsAs List(java.sql.Date.valueOf("2020-03-31"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.Colum
za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.enceladus.columns.AddEnceladusColumnsTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformerLoader
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.DataFrame
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}

private[transformer] class ColumnRenamingStreamTransformer(val columnsFrom: Seq[String], val columnsTo: Seq[String]) extends StreamTransformer {
if (columnsFrom.isEmpty || columnsTo.isEmpty) {
throw new IllegalArgumentException("Empty list of columns to rename.")
}

if (columnsFrom.size != columnsTo.size) {
throw new IllegalArgumentException("The size of source column names doesn't match the list of target column names " +
s"${columnsFrom.size} != ${columnsTo.size}.")
}

override def transform(streamData: DataFrame): DataFrame = {
val renamings = columnsFrom.zip(columnsTo)

renamings.foldLeft(streamData){ case (df, (from, to)) =>
df.withColumnRenamed(from, to)
}
}
}

object ColumnRenamingStreamTransformer extends StreamTransformerFactory with ColumnRenamingStreamTransformerAttributes {
override def apply(config: Configuration): StreamTransformer = {
val columnsFrom = config.getStringArray(KEY_COLUMNS_FROM)
val columnsTo = config.getStringArray(KEY_COLUMNS_TO)
LogManager.getLogger.info(s"Going to create ColumnRenamingStreamTransformer using: " +
s"columnsFrom='${columnsFrom.mkString(",")}', columnsTo='${columnsTo.mkString(",")}'")
new ColumnRenamingStreamTransformer(columnsFrom, columnsTo)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}

trait ColumnRenamingStreamTransformerAttributes extends HasComponentAttributes {
val KEY_COLUMNS_FROM = "columns.rename.from"
val KEY_COLUMNS_TO = "columns.rename.to"

override def getName: String = "Column Renaming Transformer"

override def getDescription: String = "This transformer renames given columns. Column expressions are not possible"

override def getProperties: Map[String, PropertyMetadata] = Map(
KEY_COLUMNS_FROM -> PropertyMetadata("Source column names", Some("Comma separated list of columns to be renames."), required = true),
KEY_COLUMNS_TO -> PropertyMetadata("Target column names", Some("Comma separated list of new names of the columns. The number of columns should match the list of source columns."), required = true)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider}

class ColumnRenamingStreamTransformerLoader extends StreamTransformerFactoryProvider {
override def getComponentFactory: StreamTransformerFactory = ColumnRenamingStreamTransformer
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWri
import za.co.absa.hyperdrive.ingestor.api.{ComponentFactory, ComponentFactoryProvider}
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.enceladus.columns.AddEnceladusColumnsTransformer
Expand All @@ -44,7 +45,7 @@ class TestServiceProviderConfiguration extends FlatSpec with Matchers {
it should "load StreamTransformers" in {
val factoryProviders = loadServices[StreamTransformerFactoryProvider, StreamTransformerFactory]()
factoryProviders should contain theSameElementsAs Seq(AddDateVersionTransformer,
ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer, AddEnceladusColumnsTransformer)
ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer, AddEnceladusColumnsTransformer, ColumnRenamingStreamTransformer)
}

it should "load StreamWriters" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.commons.configuration2.{BaseConfiguration, DynamicCombinedConfiguration}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import za.co.absa.commons.io.TempDirectory
import za.co.absa.commons.spark.SparkTestBase
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterProperties
import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter

class TestColumnRenamingStreamTransformer extends FlatSpec with SparkTestBase with Matchers with BeforeAndAfter {
import spark.implicits._

private var baseDir: TempDirectory = _

private def baseDirPath = baseDir.path.toUri.toString

private def destinationDir = s"$baseDirPath/destination"

private def checkpointDir = s"$baseDirPath/checkpoint"

private val random = scala.util.Random

behavior of "ColumnRenamingStreamTransformer"

before {
baseDir = TempDirectory("testColumnRenamingStreamTransformer").deleteOnExit()
}

after {
baseDir.delete()
}

it should "rename an input column" in {
val config = new BaseConfiguration()
config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value")
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v")
val underTest = ColumnRenamingStreamTransformer(config)
val df = getDummyReadStream().toDF()

executeQuery(underTest.transform(df))

val actualDf = spark.read.parquet(destinationDir)

assert(df.schema.exists(f => f.name == "value"))
assert(!df.schema.exists(f => f.name == "v"))
assert(actualDf.schema.exists(f => f.name == "v"))
assert(!actualDf.schema.exists(f => f.name == "value"))
}

it should "rename multiple columns while leaving existing columns intact" in {
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))

config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value, value2")
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v1, v2")
val underTest = ColumnRenamingStreamTransformer(config)
val df = getDummyReadStream().toDF()
.withColumn("value2", col("value"))
.withColumn("value3", col("value"))

executeQuery(underTest.transform(df))

val actualDf = spark.read.parquet(destinationDir)

actualDf.printSchema()
assert(actualDf.schema.exists(f => f.name == "v1"))
assert(actualDf.schema.exists(f => f.name == "v2"))
assert(actualDf.schema.exists(f => f.name == "value3"))

assert(!actualDf.schema.exists(f => f.name == "value"))
assert(!actualDf.schema.exists(f => f.name == "value2"))
}

it should "throw an exception if columns from do not match columns to" in {
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))

config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value, value2")
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v1")

val ex = intercept[IllegalArgumentException] {
ColumnRenamingStreamTransformer(config)
}

assert(ex.getMessage.contains("The size of source column names doesn't match"))
}

private def executeQuery(df: DataFrame): Unit = {
val query = df
.writeStream
.option(StreamWriterProperties.CheckpointLocation, checkpointDir)
.outputMode(OutputMode.Append)
.trigger(Trigger.Once)
.start(destinationDir)
query.awaitTermination()
}

private def getDummyReadStream(): MemoryStream[Int] = {
val input = MemoryStream[Int](random.nextInt(), spark.sqlContext)
input.addData(List.range(0, 100))
input
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import org.apache.commons.configuration2.{BaseConfiguration, DynamicCombinedConfiguration}
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.scalatest.FlatSpec

import ColumnRenamingStreamTransformer._

class TestColumnRenamingStreamTransformerObject extends FlatSpec {
behavior of ColumnRenamingStreamTransformer.getClass.getSimpleName

it should "create ColumnRenamingStreamTransformer for column pairs specified in configurations" in {
val columnsFrom = Seq("a", "b", "c")
val columnsTo = Seq("A", "b2", "C3")
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))

val transformer = ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
assert(columnsFrom == transformer.columnsFrom)
assert(columnsTo == transformer.columnsTo)
}

it should "throw an exception if source columns are not specified" in {
val columnsTo = Seq("A", "b2", "C3")
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))

val ex = intercept[IllegalArgumentException] {
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
}

assert(ex.getMessage.contains("Empty list of columns to rename."))
}

it should "throw an exception if target columns are not specified" in {
val columnsFrom = Seq("a", "b", "c")
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))

val ex = intercept[IllegalArgumentException] {
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
}

assert(ex.getMessage.contains("Empty list of columns to rename."))
}

it should "throw an exception if two lists do not match" in {
val columnsFrom = Seq("a", "b", "c")
val columnsTo = Seq("A", "b2")
val config = new DynamicCombinedConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))

val ex = intercept[IllegalArgumentException] {
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
}

assert(ex.getMessage.contains("The size of source column names doesn't match"))
}
}

0 comments on commit 02b6717

Please sign in to comment.