Skip to content

Commit

Permalink
Feature/138 preserve schema (#257)
Browse files Browse the repository at this point in the history
* Initial impl

* Initial tests

* Refactorings

* Add provider config

* Add some tests

* Add tests

* Add more tests / small refactoring

* Add documentation. Rename metadata keys

* Fix scalastyle issues

* Exclude .json from apache-rat check

* Upgrade abris version

* Fix test, don't use RowEncoder because it doesn't transfer metadata

* PR fixes
  • Loading branch information
kevinwallimann authored Feb 15, 2022
1 parent 404faa5 commit 712fe9b
Show file tree
Hide file tree
Showing 27 changed files with 1,246 additions and 24 deletions.
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ If no file exists, the reader will fail.

Any additional properties can be added with the prefix `reader.parquet.options.`. See [Spark Structured Streaming Documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources)

##### ConfluentAvroStreamDecodingTransformer
The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroStreamDecodingTransformer` requires the property `reader.kafka.topic` to be set.
##### ConfluentAvroDecodingTransformer
The `ConfluentAvroDecodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroDecodingTransformer` requires the property `reader.kafka.topic` to be set.

| Property Name | Required | Description |
| :--- | :---: | :--- |
Expand All @@ -142,14 +142,18 @@ The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.keep.columns` | No | Comma-separated list of columns to keep (e.g. offset, partition) |
| `transformer.{transformer-id}.disable.nullability.preservation` | No | Set to true to ignore fix [#137](https://github.com/AbsaOSS/hyperdrive/issues/137) and to keep the same behaviour as for versions prior to and including v3.2.2. Default value: `false` |
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
| `transformer.{transformer-id}.use.advanced.schema.conversion` | No | Set to true to convert the avro schema using [AdvancedAvroToSparkConverter](https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/AdvancedAvroToSparkConverter.scala), which puts default value and underlying avro type to struct field metadata. Default false |

For detailed information on the subject name strategy, please take a look at the [Schema Registry Documentation](https://docs.confluent.io/current/schema-registry/).

Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`

##### ConfluentAvroStreamEncodingTransformer
The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroStreamEncodingTransformer` requires the property `writer.kafka.topic` to be set.
Note: `use.advanced.schema.conversion` only works with a patched version of Spark, due to bug [SPARK-34805](https://issues.apache.org/jira/browse/SPARK-34805).
For the latest version of Spark, the patch is available in https://github.com/apache/spark/pull/35270. For other versions of Spark, the changes need to be cherry-picked and built locally.

##### ConfluentAvroEncodingTransformer
The `ConfluentAvroEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroEncodingTransformer` requires the property `writer.kafka.topic` to be set.

| Property Name | Required | Description |
| :--- | :---: | :--- |
Expand All @@ -164,6 +168,7 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
| `transformer.{transformer-id}.key.optional.fields` | No | Comma-separated list of nullable key columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
| `transformer.{transformer-id}.use.advanced.schema.conversion` | No | Set to true to convert the avro schema using [AdvancedSparkToAvroConverter](https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/AdvancedSparkToAvroConverter.scala), which reads default value and underlying avro type from struct field metadata. Default false |

Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2018 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.AdvancedAvroToSparkConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
import org.apache.avro.util.internal.JacksonUtils
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types._
import org.codehaus.jackson.map.ObjectMapper
import za.co.absa.abris.avro.sql.SchemaConverter
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._

import java.io.ByteArrayOutputStream
import scala.collection.JavaConverters._

// scalastyle:off
class AdvancedAvroToSparkConverter extends SchemaConverter {
override val shortName: String = AdvancedAvroToSparkConverter.name
private lazy val objectMapper = new ObjectMapper()

case class SchemaType(dataType: DataType, nullable: Boolean, avroType: Option[Schema])

/**
* This function takes an avro schema and returns a sql schema.
*/
override def toSqlType(avroSchema: Schema): DataType = {
toSqlTypeHelper(avroSchema, Set.empty).dataType
}

def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match {
case RECORD =>
if (existingRecordNames.contains(avroSchema.getFullName)) {
throw new IncompatibleSchemaException(s"""
|Found recursive reference in Avro schema, which can not be processed by Spark:
|${avroSchema.toString(true)}
""".stripMargin)
}
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val metadataBuilder = new MetadataBuilder()
val defaultJsonOpt = Option(JacksonUtils.toJsonNode(f.defaultVal()))
val metadataBuilderWithDefault = defaultJsonOpt match {
case Some(defaultJson) =>
val baos = new ByteArrayOutputStream()
objectMapper.writeValue(baos, defaultJson)
val r = metadataBuilder.putString(DefaultValueKey, baos.toString)
baos.close()
r
case None => metadataBuilder
}

val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
schemaType.avroType
.map(_.toString)
.map(schema => metadataBuilderWithDefault.putString(AvroTypeKey, schema).build())
.map(metadata => StructField(f.name, schemaType.dataType, schemaType.nullable, metadata))
.getOrElse(StructField(f.name, schemaType.dataType, schemaType.nullable, metadataBuilderWithDefault.build()))
}

SchemaType(StructType(fields), nullable = false, None)

case ARRAY =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false,
schemaType.avroType)

case MAP =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames)
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false,
schemaType.avroType)

case UNION =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
if (remainingUnionTypes.size == 1) {
toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames).copy(nullable = true)
} else {
toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), existingRecordNames)
.copy(nullable = true)
}
} else avroSchema.getTypes.asScala.map(_.getType) match {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false, Option(avroSchema))
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
SchemaType(DoubleType, nullable = false, Option(avroSchema))
case _ =>
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior when converting between Avro and Parquet.
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames)
schemaType.avroType
.map(_.toString)
.map(schema => new MetadataBuilder().putString(AvroTypeKey, schema).build())
.map(metadata => StructField(s"member$i", schemaType.dataType, schemaType.nullable, metadata))
// All fields are nullable because only one of them is set at a time
.getOrElse(StructField(s"member$i", schemaType.dataType, nullable = true))
}

SchemaType(StructType(fields), nullable = false, None)
}

case _ =>
val originalSchemaType = SchemaConverters.toSqlType(avroSchema)
SchemaType(originalSchemaType.dataType, originalSchemaType.nullable, Option(avroSchema))
}
}
}

// scalastyle:on
object AdvancedAvroToSparkConverter {
val name = "advanced"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.LogicalTypes.TimestampMillis
import org.apache.avro.Schema.Type._
import org.apache.avro.util.internal.JacksonUtils
import org.apache.avro.{JsonProperties, LogicalTypes, Schema, SchemaBuilder}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
import org.codehaus.jackson.map.ObjectMapper

import java.util.Objects
import scala.util.Try
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._

object AdvancedSparkToAvroConverter extends SparkToAvroConverter {
private lazy val nullSchema = Schema.create(Schema.Type.NULL)
private lazy val objectMapper = new ObjectMapper()

override def apply(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
toAvroType(catalystType, None, nullable, None, recordName, nameSpace)

// scalastyle:off
private def toAvroType(
catalystType: DataType,
avroSchema: Option[Schema],
nullable: Boolean = false,
defaultValue: Option[Object] = None,
recordName: String = "topLevelRecord",
nameSpace: String = "")
: Schema = {
val builder = SchemaBuilder.builder()

val schema = catalystType match {
case TimestampType => avroSchema match {
case Some(schema) if schema.getLogicalType.isInstanceOf[TimestampMillis] =>
LogicalTypes.timestampMillis().addToSchema(builder.longType())
case _ => LogicalTypes.timestampMicros().addToSchema(builder.longType())
}
case d: DecimalType => avroSchema match {
case Some(schema) if schema.getType == BYTES =>
val avroType = LogicalTypes.decimal(d.precision, d.scale)
avroType.addToSchema(SchemaBuilder.builder().bytesType())
case _ => getDecimalFixedType(d, avroSchema, nameSpace, recordName)
}
case BinaryType => avroSchema match {
case Some(schema) if schema.getType == FIXED =>
val name = getFixedName(recordName, nameSpace)
builder
.fixed(name)
.size(schema.getFixedSize)
case _ => builder.bytesType()
}
case ArrayType(et, containsNull) =>
builder.array()
.items(toAvroType(et, avroSchema, containsNull, defaultValue, recordName, nameSpace))
case MapType(StringType, vt, valueContainsNull) =>
builder.map()
.values(toAvroType(vt, avroSchema, valueContainsNull, defaultValue, recordName, nameSpace))
case st: StructType =>
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
st.foreach { f =>
val schema = Try(f.metadata.getString(AvroTypeKey)).toOption
.map(schema => new Schema.Parser().parse(schema))
val defaultValueOpt = Try(f.metadata.getString(DefaultValueKey))
.flatMap(defaultJsonString => Try {
val jsonNode = objectMapper.readTree(defaultJsonString)
JacksonUtils.toObject(jsonNode)
}).toOption
val fieldAvroType =
toAvroType(f.dataType, schema, f.nullable, defaultValueOpt, f.name, childNameSpace)
defaultValueOpt match {
case Some(defaultObject) if !Objects.equals(defaultObject, JsonProperties.NULL_VALUE) =>
fieldsAssembler.name(f.name).`type`(fieldAvroType).withDefault(defaultObject)
case Some(_) =>
fieldsAssembler.name(f.name).`type`(fieldAvroType).withDefault(null)
case _ => fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
}
}
fieldsAssembler.endRecord()

// nullability is handled later in this method, thus pass nullable = false
case _ => SchemaConverters.toAvroType(catalystType, nullable = false, recordName, nameSpace)
}
if (nullable) {
defaultValue match {
case Some(value) if !value.isInstanceOf[JsonProperties.Null] => Schema.createUnion(schema, nullSchema)
case _ => Schema.createUnion(nullSchema, schema)
}
} else {
schema
}
}

// scalastyle:on
private def getDecimalFixedType(d: DecimalType, avroSchema: Option[Schema], nameSpace: String, recordName: String) = {
val avroType = LogicalTypes.decimal(d.precision, d.scale)
val name = getFixedName(recordName, nameSpace)
val minBytes = minBytesForPrecision(d.precision)
val size = avroSchema.map { schema =>
if (schema.getFixedSize > minBytes) schema.getFixedSize else minBytes
}.getOrElse {
minBytes
}
avroType.addToSchema(SchemaBuilder.fixed(name).size(size))
}

private def getFixedName(recordName: String, nameSpace: String) = {
// Need to avoid naming conflict for the fixed fields
nameSpace match {
case "" => s"$recordName.fixed"
case _ => s"$nameSpace.$recordName.fixed"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ object ConfluentAvroDecodingTransformer extends StreamTransformerFactory with Co
override val namingStrategy: String = KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY
override val recordName: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME
override val recordNamespace: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE
override val useAdvancedSchemaConversion: String = KEY_USE_ADVANCED_SCHEMA_CONVERSION
}

override def apply(config: Configuration): StreamTransformer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes

val KEY_KEEP_COLUMNS = "keep.columns"
val KEY_DISABLE_NULLABILITY_PRESERVATION = "disable.nullability.preservation"

val KEY_USE_ADVANCED_SCHEMA_CONVERSION = "use.advanced.schema.conversion"

override def getName: String = "Confluent Avro Stream Decoder"

Expand All @@ -55,7 +55,8 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
KEY_KEEP_COLUMNS -> PropertyMetadata("Columns to keep", Some("Comma-separated list of columns to keep (e.g. offset, partition)"), required = false),
KEY_DISABLE_NULLABILITY_PRESERVATION -> PropertyMetadata("Disable nullability preservation", Some("Keep same behaviour as for versions prior to and including v3.2.2"), required = false),
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false)
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false),
KEY_USE_ADVANCED_SCHEMA_CONVERSION -> PropertyMetadata("Use advanced Avro - Spark schema conversion", Some("Stores logical type and default value in Spark column metadata. Default false"), required = false)
)

override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT)
Expand Down
Loading

0 comments on commit 712fe9b

Please sign in to comment.