Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/138 preserve schema #257

Merged
merged 14 commits into from
Feb 15, 2022
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ If no file exists, the reader will fail.

Any additional properties can be added with the prefix `reader.parquet.options.`. See [Spark Structured Streaming Documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources)

##### ConfluentAvroStreamDecodingTransformer
The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroStreamDecodingTransformer` requires the property `reader.kafka.topic` to be set.
##### ConfluentAvroDecodingTransformer
The `ConfluentAvroDecodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroDecodingTransformer` requires the property `reader.kafka.topic` to be set.

| Property Name | Required | Description |
| :--- | :---: | :--- |
Expand All @@ -142,14 +142,15 @@ The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.keep.columns` | No | Comma-separated list of columns to keep (e.g. offset, partition) |
| `transformer.{transformer-id}.disable.nullability.preservation` | No | Set to true to ignore fix [#137](https://github.com/AbsaOSS/hyperdrive/issues/137) and to keep the same behaviour as for versions prior to and including v3.2.2. Default value: `false` |
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
| `transformer.{transformer-id}.use.advanced.schema.conversion` | No | Set to true to convert the avro schema using [AdvancedAvroToSparkConverter](https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/AdvancedAvroToSparkConverter.scala), which puts default value and underlying avro type to struct field metadata. Default false |

For detailed information on the subject name strategy, please take a look at the [Schema Registry Documentation](https://docs.confluent.io/current/schema-registry/).

Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`

##### ConfluentAvroStreamEncodingTransformer
The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroStreamEncodingTransformer` requires the property `writer.kafka.topic` to be set.
##### ConfluentAvroEncodingTransformer
The `ConfluentAvroEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroEncodingTransformer` requires the property `writer.kafka.topic` to be set.

| Property Name | Required | Description |
| :--- | :---: | :--- |
Expand All @@ -164,6 +165,7 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
| `transformer.{transformer-id}.key.optional.fields` | No | Comma-separated list of nullable key columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
| `transformer.{transformer-id}.use.advanced.schema.conversion` | No | Set to true to convert the avro schema using [AdvancedSparkToAvroConverter](https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/AdvancedSparkToAvroConverter.scala), which reads default value and underlying avro type from struct field metadata. Default false |
jozefbakus marked this conversation as resolved.
Show resolved Hide resolved

Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2018 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.AdvancedAvroToSparkConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
import org.apache.avro.util.internal.JacksonUtils
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types._
import org.codehaus.jackson.map.ObjectMapper
import za.co.absa.abris.avro.sql.SchemaConverter
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._

import java.io.ByteArrayOutputStream
import scala.collection.JavaConverters._

// scalastyle:off
class AdvancedAvroToSparkConverter extends SchemaConverter {
override val shortName: String = AdvancedAvroToSparkConverter.name
private lazy val objectMapper = new ObjectMapper()

case class SchemaType(dataType: DataType, nullable: Boolean, avroType: Option[Schema])

/**
* This function takes an avro schema and returns a sql schema.
*/
override def toSqlType(avroSchema: Schema): DataType = {
toSqlTypeHelper(avroSchema, Set.empty).dataType
}

def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match {
case INT
| STRING
| BOOLEAN
| BYTES
| FIXED
| DOUBLE
| FLOAT
| LONG
| ENUM =>
val originalSchemaType = SchemaConverters.toSqlType(avroSchema)
SchemaType(originalSchemaType.dataType, originalSchemaType.nullable, Option(avroSchema))

case RECORD =>
if (existingRecordNames.contains(avroSchema.getFullName)) {
throw new IncompatibleSchemaException(s"""
|Found recursive reference in Avro schema, which can not be processed by Spark:
|${avroSchema.toString(true)}
""".stripMargin)
}
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val metadataBuilder = new MetadataBuilder()
val defaultJsonOpt = Option(JacksonUtils.toJsonNode(f.defaultVal()))
val metadataBuilderWithDefault = defaultJsonOpt match {
case Some(defaultJson) =>
val baos = new ByteArrayOutputStream()
objectMapper.writeValue(baos, defaultJson)
val r = metadataBuilder.putString(DefaultValueKey, baos.toString)
baos.close()
r
case None => metadataBuilder
}

val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
schemaType.avroType
.map(_.toString)
.map(schema => metadataBuilderWithDefault.putString(AvroTypeKey, schema).build())
.map(metadata => StructField(f.name, schemaType.dataType, schemaType.nullable, metadata))
.getOrElse(StructField(f.name, schemaType.dataType, schemaType.nullable, metadataBuilderWithDefault.build()))
}

SchemaType(StructType(fields), nullable = false, None)

case ARRAY =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false,
schemaType.avroType)

case MAP =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames)
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false,
schemaType.avroType)

case UNION =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
if (remainingUnionTypes.size == 1) {
toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames).copy(nullable = true)
} else {
toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), existingRecordNames)
.copy(nullable = true)
}
} else avroSchema.getTypes.asScala.map(_.getType) match {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false, Option(avroSchema))
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
SchemaType(DoubleType, nullable = false, Option(avroSchema))
case _ =>
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior when converting between Avro and Parquet.
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames)
schemaType.avroType
.map(_.toString)
.map(schema => new MetadataBuilder().putString(AvroTypeKey, schema).build())
.map(metadata => StructField(s"member$i", schemaType.dataType, schemaType.nullable, metadata))
// All fields are nullable because only one of them is set at a time
.getOrElse(StructField(s"member$i", schemaType.dataType, nullable = true))
}

SchemaType(StructType(fields), nullable = false, None)
}

case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
jozefbakus marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// scalastyle:on
object AdvancedAvroToSparkConverter {
val name = "advanced"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.LogicalTypes.TimestampMillis
import org.apache.avro.Schema.Type._
import org.apache.avro.util.internal.JacksonUtils
import org.apache.avro.{JsonProperties, LogicalTypes, Schema, SchemaBuilder}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
import org.codehaus.jackson.map.ObjectMapper

import java.util.Objects
import scala.util.Try
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._

object AdvancedSparkToAvroConverter extends SparkToAvroConverter {
private lazy val nullSchema = Schema.create(Schema.Type.NULL)
private lazy val objectMapper = new ObjectMapper()

override def apply(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
toAvroType(catalystType, None, nullable, None, recordName, nameSpace)

// scalastyle:off
private def toAvroType(
catalystType: DataType,
avroSchema: Option[Schema],
nullable: Boolean = false,
defaultValue: Option[Object] = None,
recordName: String = "topLevelRecord",
nameSpace: String = "")
: Schema = {
val builder = SchemaBuilder.builder()

val schema = catalystType match {
case BooleanType
| ByteType
| ShortType
| IntegerType
| LongType
| DateType
| FloatType
| DoubleType
| StringType
// nullability is handled later in this method, thus pass nullable = false
=> SchemaConverters.toAvroType(catalystType, nullable = false, recordName, nameSpace)
case TimestampType => avroSchema match {
case Some(schema) if schema.getLogicalType.isInstanceOf[TimestampMillis] =>
LogicalTypes.timestampMillis().addToSchema(builder.longType())
case _ => LogicalTypes.timestampMicros().addToSchema(builder.longType())
}
case d: DecimalType => avroSchema match {
case Some(schema) if schema.getType == BYTES =>
val avroType = LogicalTypes.decimal(d.precision, d.scale)
avroType.addToSchema(SchemaBuilder.builder().bytesType())
case _ => getDecimalFixedType(d, avroSchema, nameSpace, recordName)
}
case BinaryType => avroSchema match {
case Some(schema) if schema.getType == FIXED =>
val name = getFixedName(recordName, nameSpace)
builder
.fixed(name)
.size(schema.getFixedSize)
case _ => builder.bytesType()
}
case ArrayType(et, containsNull) =>
builder.array()
.items(toAvroType(et, avroSchema, containsNull, defaultValue, recordName, nameSpace))
case MapType(StringType, vt, valueContainsNull) =>
builder.map()
.values(toAvroType(vt, avroSchema, valueContainsNull, defaultValue, recordName, nameSpace))
case st: StructType =>
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
st.foreach { f =>
val schema = Try(f.metadata.getString(AvroTypeKey)).toOption
.map(schema => new Schema.Parser().parse(schema))
val defaultValueOpt = Try(f.metadata.getString(DefaultValueKey))
.flatMap(defaultJsonString => Try {
val jsonNode = objectMapper.readTree(defaultJsonString)
JacksonUtils.toObject(jsonNode)
}).toOption
val fieldAvroType =
toAvroType(f.dataType, schema, f.nullable, defaultValueOpt, f.name, childNameSpace)
defaultValueOpt match {
case Some(defaultObject) if !Objects.equals(defaultObject, JsonProperties.NULL_VALUE) =>
fieldsAssembler.name(f.name).`type`(fieldAvroType).withDefault(defaultObject)
case Some(_) =>
fieldsAssembler.name(f.name).`type`(fieldAvroType).withDefault(null)
case _ => fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
}
}
fieldsAssembler.endRecord()

// This should never happen.
case other => throw new IncompatibleSchemaException(s"Unexpected type $other.")
jozefbakus marked this conversation as resolved.
Show resolved Hide resolved
}
if (nullable) {
defaultValue match {
case Some(value) if !value.isInstanceOf[JsonProperties.Null] => Schema.createUnion(schema, nullSchema)
case _ => Schema.createUnion(nullSchema, schema)
}
} else {
schema
}
}

// scalastyle:on
private def getDecimalFixedType(d: DecimalType, avroSchema: Option[Schema], nameSpace: String, recordName: String) = {
val avroType = LogicalTypes.decimal(d.precision, d.scale)
val name = getFixedName(recordName, nameSpace)
val minBytes = minBytesForPrecision(d.precision)
val size = avroSchema.map { schema =>
if (schema.getFixedSize > minBytes) schema.getFixedSize else minBytes
}.getOrElse {
minBytes
}
avroType.addToSchema(SchemaBuilder.fixed(name).size(size))
}

private def getFixedName(recordName: String, nameSpace: String) = {
// Need to avoid naming conflict for the fixed fields
nameSpace match {
case "" => s"$recordName.fixed"
case _ => s"$nameSpace.$recordName.fixed"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ object ConfluentAvroDecodingTransformer extends StreamTransformerFactory with Co
override val namingStrategy: String = KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY
override val recordName: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME
override val recordNamespace: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE
override val useAdvancedSchemaConversion: String = KEY_USE_ADVANCED_SCHEMA_CONVERSION
}

override def apply(config: Configuration): StreamTransformer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes

val KEY_KEEP_COLUMNS = "keep.columns"
val KEY_DISABLE_NULLABILITY_PRESERVATION = "disable.nullability.preservation"

val KEY_USE_ADVANCED_SCHEMA_CONVERSION = "use.advanced.schema.conversion"

override def getName: String = "Confluent Avro Stream Decoder"

Expand All @@ -55,7 +55,8 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
KEY_KEEP_COLUMNS -> PropertyMetadata("Columns to keep", Some("Comma-separated list of columns to keep (e.g. offset, partition)"), required = false),
KEY_DISABLE_NULLABILITY_PRESERVATION -> PropertyMetadata("Disable nullability preservation", Some("Keep same behaviour as for versions prior to and including v3.2.2"), required = false),
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false)
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false),
KEY_USE_ADVANCED_SCHEMA_CONVERSION -> PropertyMetadata("Use advanced Avro - Spark schema conversion", Some("Stores logical type and default value in Spark column metadata. Default false"), required = false)
)

override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT)
Expand Down
Loading