Skip to content

Commit

Permalink
Fix test, don't use RowEncoder because it doesn't transfer metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinwallimann committed Feb 8, 2022
1 parent e65ac09 commit c9d1b22
Showing 1 changed file with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.apache.commons.configuration2.BaseConfiguration
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType}
Expand All @@ -36,6 +37,8 @@ import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.
import za.co.absa.hyperdrive.ingestor.implementation.utils.AbrisConfigUtil
import za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter

import scala.reflect.ClassTag

class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with BeforeAndAfter with SparkTestBase {

private val topic = "topic"
Expand Down Expand Up @@ -173,7 +176,9 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
StructField("col1", IntegerType, nullable = true,
new MetadataBuilder().putString(SparkMetadataKeys.DefaultValueKey, "42").build()))
)
val memoryStream = new MemoryStream[Row](1, spark.sqlContext)(RowEncoder(schema))

import scala.collection.JavaConverters._
val df = spark.createDataFrame(Seq[Row]().asJava, schema)

val config = new BaseConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
Expand All @@ -197,10 +202,24 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
}

// when
encoder.transform(memoryStream.toDF())
encoder.transform(df)

// then
val schemaMetadata = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-value")
schemaMetadata.getSchema shouldBe expectedSchemaString
}
}

class MetadataSafeExpressionEncoder[T](objSerializer2: Expression, objDeserializer2: Expression, clsTag2: ClassTag[T])
extends ExpressionEncoder[T](objSerializer2, objDeserializer2, clsTag2) {

override val schema: StructType = StructType(serializer.map { s =>
StructField(s.name, s.dataType, s.nullable, s.metadata)
})

override protected val schemaString: String =
schema
.zip(attrs)
.map { case(f, a) => s"${f.name}$a: ${f.dataType.simpleString}"}.mkString(", ")

}

0 comments on commit c9d1b22

Please sign in to comment.