Skip to content

Commit

Permalink
#170 Extend e2e the test suite with the renaming transformer.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Oct 30, 2020
1 parent c7dccb1 commit b4f4316
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
"component.transformer.class.[avro.decoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer",
"component.transformer.id.1" -> "[version.incrementer]",
"component.transformer.class.[version.incrementer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer",
"component.transformer.id.2" -> "[column.renamer]",
"component.transformer.class.[column.renamer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer",
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter",

// Spark settings
Expand All @@ -82,6 +84,10 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
// comma separated list of columns to select
"transformer.[version.incrementer].report.date" -> "2020-03-31",

// Rename columns transformer
"transformer.[column.renamer].columns.rename.from" -> "field1,field2",
"transformer.[column.renamer].columns.rename.to" -> "field3,field4",

// Sink(Parquet) settings
"writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"),
"writer.parquet.destination.directory" -> destinationDir,
Expand All @@ -102,7 +108,7 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
val df = spark.read.parquet(destinationDir)
df.count shouldBe ingestionSize
import spark.implicits._
df.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version")
df.columns should contain theSameElementsAs List("field3", "field4", "hyperdrive_date", "hyperdrive_version")
df.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1)

// when (2)
Expand All @@ -115,10 +121,10 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
val df2 = spark.read.parquet(destinationDir)
df2.count shouldBe 2 * ingestionSize
import spark.implicits._
df2.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version")
df2.columns should contain theSameElementsAs List("field3", "field4", "hyperdrive_date", "hyperdrive_version")
df2.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1, 2)
df2.select("field1").distinct().as[String].collect() should contain theSameElementsAs List("hello")
df2.select("field2").as[Int].collect() should contain theSameElementsAs (0 until ingestionSize) ++ (0 until ingestionSize)
df2.select("field3").distinct().as[String].collect() should contain theSameElementsAs List("hello")
df2.select("field4").as[Int].collect() should contain theSameElementsAs (0 until ingestionSize) ++ (0 until ingestionSize)
df2.select("hyperdrive_date").distinct()
.as[java.sql.Date].collect() should contain theSameElementsAs List(java.sql.Date.valueOf("2020-03-31"))
}
Expand Down

0 comments on commit b4f4316

Please sign in to comment.