Skip to content

Commit

Permalink
#170 Add column renaming transformer.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Oct 30, 2020
1 parent 8cf8e3c commit 25f28e0
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.Colum
za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.enceladus.columns.AddEnceladusColumnsTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformerLoader
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.DataFrame
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}

private[transformer] class ColumnRenamingStreamTransformer(val columnsFrom: Seq[String], val columnsTo: Seq[String]) extends StreamTransformer {
if (columnsFrom.isEmpty || columnsTo.isEmpty) {
throw new IllegalArgumentException("Empty list of columns to rename.")
}

if (columnsFrom.size != columnsTo.size) {
throw new IllegalArgumentException("The size of source column names doesn't match the list of target column names " +
s"${columnsFrom.size} != ${columnsTo.size}.")
}

override def transform(streamData: DataFrame): DataFrame = {
val renamings = columnsFrom.zip(columnsTo)

renamings.foldLeft(streamData){ case (df, (from, to)) =>
df.withColumnRenamed(from, to)
}
}
}

object ColumnRenamingStreamTransformer extends StreamTransformerFactory with ColumnRenamingStreamTransformerAttributes {
override def apply(config: Configuration): StreamTransformer = {
val columnsFrom = config.getStringArray(KEY_COLUMNS_FROM)
val columnsTo = config.getStringArray(KEY_COLUMNS_TO)
LogManager.getLogger.info(s"Going to create ColumnRenamingStreamTransformer using: columnsFrom='$columnsFrom', columnsTo='$columnsTo'")
new ColumnRenamingStreamTransformer(columnsFrom, columnsTo)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}

trait ColumnRenamingStreamTransformerAttributes extends HasComponentAttributes {
val KEY_COLUMNS_FROM = "columns.rename.from"
val KEY_COLUMNS_TO = "columns.rename.to"

override def getName: String = "Column Renaming Transformer"

override def getDescription: String = "This transformer renames given columns. Column expressions are not possible"

override def getProperties: Map[String, PropertyMetadata] = Map(
KEY_COLUMNS_FROM -> PropertyMetadata("Source column names", Some("Comma separated list of columns to be renames."), required = true),
KEY_COLUMNS_TO -> PropertyMetadata("Target column names", Some("Comma separated list of new names of the columns. The number of columns should match the list of source columns."), required = true)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider}

class ColumnRenamingStreamTransformerLoader extends StreamTransformerFactoryProvider {
override def getComponentFactory: StreamTransformerFactory = ColumnRenamingStreamTransformer
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWri
import za.co.absa.hyperdrive.ingestor.api.{ComponentFactory, ComponentFactoryProvider}
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer
import za.co.absa.hyperdrive.ingestor.implementation.transformer.enceladus.columns.AddEnceladusColumnsTransformer
Expand All @@ -44,7 +45,7 @@ class TestServiceProviderConfiguration extends FlatSpec with Matchers {
it should "load StreamTransformers" in {
val factoryProviders = loadServices[StreamTransformerFactoryProvider, StreamTransformerFactory]()
factoryProviders should contain theSameElementsAs Seq(AddDateVersionTransformer,
ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer, AddEnceladusColumnsTransformer)
ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer, AddEnceladusColumnsTransformer, ColumnRenamingStreamTransformer)
}

it should "load StreamWriters" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming

import org.apache.commons.configuration2.BaseConfiguration
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import za.co.absa.commons.io.TempDirectory
import za.co.absa.commons.spark.SparkTestBase
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterProperties
import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter

class TestColumnRenamingStreamTransformer extends FlatSpec with SparkTestBase with Matchers with BeforeAndAfter {
import spark.implicits._

private var baseDir: TempDirectory = _

private def baseDirPath = baseDir.path.toUri.toString

private def destinationDir = s"$baseDirPath/destination"

private def checkpointDir = s"$baseDirPath/checkpoint"

private val random = scala.util.Random

behavior of "ColumnRenamingStreamTransformer"

before {
baseDir = TempDirectory("testColumnRenamingStreamTransformer").deleteOnExit()
}

after {
baseDir.delete()
}

it should "rename input column" in {
val config = new BaseConfiguration()
config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value")
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v")
val underTest = ColumnRenamingStreamTransformer(config)
val df = getDummyReadStream().toDF()

executeQuery(underTest.transform(df))

val actualDf = spark.read.parquet(destinationDir)

assert(df.schema.exists(f => f.name == "value"))
assert(!df.schema.exists(f => f.name == "v"))
assert(actualDf.schema.exists(f => f.name == "v"))
assert(!actualDf.schema.exists(f => f.name == "value"))
}


private def executeQuery(df: DataFrame): Unit = {
val query = df
.writeStream
.option(StreamWriterProperties.CheckpointLocation, checkpointDir)
.outputMode(OutputMode.Append)
.trigger(Trigger.Once)
.start(destinationDir)
query.awaitTermination()
}

private def getDummyReadStream(): MemoryStream[Int] = {
val input = MemoryStream[Int](random.nextInt(), spark.sqlContext)
input.addData(List.range(0, 100))
input
}
}

0 comments on commit 25f28e0

Please sign in to comment.