From d6a633f4f8fa91b6ac1a8eb81f313b09f139a096 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sun, 22 Jul 2018 17:36:57 -0700 Subject: [PATCH] [SPARK-24811][SQL] Avro: add new function from_avro and to_avro 1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value. 2. Add a new function to_avro for converting a column into binary of avro format with the specified schema. I created #21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/ Additional changes In this PR: 1. Add `scalacheck` dependency in pom.xml to resolve the failure. 2. Update the `log4j.properties` to make it consistent with other modules. Unit test Compile with different commands: ``` ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn compile test-compile ``` Author: Gengliang Wang Closes #21838 from gengliangwang/from_and_to_avro. (cherry picked from commit 8817c68f5099901753585716e00281736938bca0) --- external/avro/pom.xml | 5 + .../spark/sql/avro/AvroDataToCatalyst.scala | 68 +++++++ .../spark/sql/avro/CatalystDataToAvro.scala | 69 +++++++ .../org/apache/spark/sql/avro/package.scala | 31 ++++ .../avro/src/test/resources/log4j.properties | 42 +---- .../AvroCatalystDataConversionSuite.scala | 175 ++++++++++++++++++ .../spark/sql/avro/AvroFunctionsSuite.scala | 83 +++++++++ .../expressions/ExpressionEvalHelper.scala | 6 + 8 files changed, 447 insertions(+), 32 deletions(-) create mode 100644 external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala create mode 100644 external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala create mode 100644 external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala create mode 100644 external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala diff --git a/external/avro/pom.xml b/external/avro/pom.xml index 589dfc70b53d3..3f4c4edc51326 100644 --- a/external/avro/pom.xml +++ b/external/avro/pom.xml @@ -61,6 +61,11 @@ test-jar test + + org.scalacheck + scalacheck_${scala.binary.version} + test + org.apache.spark spark-tags_${scala.binary.version} diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala new file mode 100644 index 0000000000000..6671b3fb8705c --- /dev/null +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.io.{BinaryDecoder, DecoderFactory} + +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} + +case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String) + extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) + + override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType + + override def nullable: Boolean = true + + @transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema) + + @transient private lazy val reader = new GenericDatumReader[Any](avroSchema) + + @transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType) + + @transient private var decoder: BinaryDecoder = _ + + @transient private var result: Any = _ + + override def nullSafeEval(input: Any): Any = { + val binary = input.asInstanceOf[Array[Byte]] + decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) + result = reader.read(result, decoder) + deserializer.deserialize(result) + } + + override def simpleString: String = { + s"from_avro(${child.sql}, ${dataType.simpleString})" + } + + override def sql: String = { + s"from_avro(${child.sql}, ${dataType.catalogString})" + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val expr = ctx.addReferenceObj("this", this) + defineCodeGen(ctx, ev, input => + s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") + } +} diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala new file mode 100644 index 0000000000000..a669388e88258 --- /dev/null +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream + +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.io.{BinaryEncoder, EncoderFactory} + +import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} +import org.apache.spark.sql.types.{BinaryType, DataType} + +case class CatalystDataToAvro(child: Expression) extends UnaryExpression { + + override def dataType: DataType = BinaryType + + @transient private lazy val avroType = + SchemaConverters.toAvroType(child.dataType, child.nullable) + + @transient private lazy val serializer = + new AvroSerializer(child.dataType, avroType, child.nullable) + + @transient private lazy val writer = + new GenericDatumWriter[Any](avroType) + + @transient private var encoder: BinaryEncoder = _ + + @transient private lazy val out = new ByteArrayOutputStream + + override def nullSafeEval(input: Any): Any = { + out.reset() + encoder = EncoderFactory.get().directBinaryEncoder(out, encoder) + val avroData = serializer.serialize(input) + writer.write(avroData, encoder) + encoder.flush() + out.toByteArray + } + + override def simpleString: String = { + s"to_avro(${child.sql}, ${child.dataType.simpleString})" + } + + override def sql: String = { + s"to_avro(${child.sql}, ${child.dataType.catalogString})" + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val expr = ctx.addReferenceObj("this", this) + defineCodeGen(ctx, ev, input => + s"(byte[]) $expr.nullSafeEval($input)") + } +} diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala index b3c8a669cf820..e82651d96a03d 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql +import org.apache.avro.Schema + +import org.apache.spark.annotation.Experimental + package object avro { /** * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using @@ -36,4 +40,31 @@ package object avro { @scala.annotation.varargs def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*) } + + /** + * Converts a binary column of avro format into its corresponding catalyst value. The specified + * schema must match the read data, otherwise the behavior is undefined: it may fail or return + * arbitrary result. + * + * @param data the binary column. + * @param jsonFormatSchema the avro schema in JSON string format. + * + * @since 2.4.0 + */ + @Experimental + def from_avro(data: Column, jsonFormatSchema: String): Column = { + new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema)) + } + + /** + * Converts a column into binary of avro format. + * + * @param data the data column. + * + * @since 2.4.0 + */ + @Experimental + def to_avro(data: Column): Column = { + new Column(CatalystDataToAvro(data.expr)) + } } diff --git a/external/avro/src/test/resources/log4j.properties b/external/avro/src/test/resources/log4j.properties index c18a724007c27..edbecdae92096 100644 --- a/external/avro/src/test/resources/log4j.properties +++ b/external/avro/src/test/resources/log4j.properties @@ -15,35 +15,13 @@ # limitations under the License. # -# Set everything to be logged to the file core/target/unit-tests.log -log4j.rootLogger=DEBUG, CA, FA - -#Console Appender -log4j.appender.CA=org.apache.log4j.ConsoleAppender -log4j.appender.CA.layout=org.apache.log4j.PatternLayout -log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n -log4j.appender.CA.Threshold = WARN - - -#File Appender -log4j.appender.FA=org.apache.log4j.FileAppender -log4j.appender.FA.append=false -log4j.appender.FA.file=target/unit-tests.log -log4j.appender.FA.layout=org.apache.log4j.PatternLayout -log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Set the logger level of File Appender to WARN -log4j.appender.FA.Threshold = INFO - -# Some packages are noisy for no good reason. -log4j.additivity.parquet.hadoop.ParquetRecordReader=false -log4j.logger.parquet.hadoop.ParquetRecordReader=OFF - -log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false -log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF - -log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false -log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF - -log4j.additivity.hive.ql.metadata.Hive=false -log4j.logger.hive.ql.metadata.Hive=OFF \ No newline at end of file +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala new file mode 100644 index 0000000000000..06d5477b2ea45 --- /dev/null +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{AvroDataToCatalyst, CatalystDataToAvro, RandomDataGenerator} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalHelper { + + private def roundTripTest(data: Literal): Unit = { + val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable) + checkResult(data, avroType.toString, data.eval()) + } + + private def checkResult(data: Literal, schema: String, expected: Any): Unit = { + checkEvaluation( + AvroDataToCatalyst(CatalystDataToAvro(data), schema), + prepareExpectedResult(expected)) + } + + private def assertFail(data: Literal, schema: String): Unit = { + intercept[java.io.EOFException] { + AvroDataToCatalyst(CatalystDataToAvro(data), schema).eval() + } + } + + private val testingTypes = Seq( + BooleanType, + ByteType, + ShortType, + IntegerType, + LongType, + FloatType, + DoubleType, + DecimalType(8, 0), // 32 bits decimal without fraction + DecimalType(8, 4), // 32 bits decimal + DecimalType(16, 0), // 64 bits decimal without fraction + DecimalType(16, 11), // 64 bits decimal + DecimalType(38, 0), + DecimalType(38, 38), + StringType, + BinaryType) + + protected def prepareExpectedResult(expected: Any): Any = expected match { + // Spark decimal is converted to avro string= + case d: Decimal => UTF8String.fromString(d.toString) + // Spark byte and short both map to avro int + case b: Byte => b.toInt + case s: Short => s.toInt + case row: GenericInternalRow => InternalRow.fromSeq(row.values.map(prepareExpectedResult)) + case array: GenericArrayData => new GenericArrayData(array.array.map(prepareExpectedResult)) + case map: MapData => + val keys = new GenericArrayData( + map.keyArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult)) + val values = new GenericArrayData( + map.valueArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult)) + new ArrayBasedMapData(keys, values) + case other => other + } + + testingTypes.foreach { dt => + val seed = scala.util.Random.nextLong() + test(s"single $dt with seed $seed") { + val rand = new scala.util.Random(seed) + val data = RandomDataGenerator.forType(dt, rand = rand).get.apply() + val converter = CatalystTypeConverters.createToCatalystConverter(dt) + val input = Literal.create(converter(data), dt) + roundTripTest(input) + } + } + + for (_ <- 1 to 5) { + val seed = scala.util.Random.nextLong() + val rand = new scala.util.Random(seed) + val schema = RandomDataGenerator.randomSchema(rand, 5, testingTypes) + test(s"flat schema ${schema.catalogString} with seed $seed") { + val data = RandomDataGenerator.randomRow(rand, schema) + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val input = Literal.create(converter(data), schema) + roundTripTest(input) + } + } + + for (_ <- 1 to 5) { + val seed = scala.util.Random.nextLong() + val rand = new scala.util.Random(seed) + val schema = RandomDataGenerator.randomNestedSchema(rand, 10, testingTypes) + test(s"nested schema ${schema.catalogString} with seed $seed") { + val data = RandomDataGenerator.randomRow(rand, schema) + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val input = Literal.create(converter(data), schema) + roundTripTest(input) + } + } + + test("read int as string") { + val data = Literal(1) + val avroTypeJson = + s""" + |{ + | "type": "string", + | "name": "my_string" + |} + """.stripMargin + + // When read int as string, avro reader is not able to parse the binary and fail. + assertFail(data, avroTypeJson) + } + + test("read string as int") { + val data = Literal("abc") + val avroTypeJson = + s""" + |{ + | "type": "int", + | "name": "my_int" + |} + """.stripMargin + + // When read string data as int, avro reader is not able to find the type mismatch and read + // the string length as int value. + checkResult(data, avroTypeJson, 3) + } + + test("read float as double") { + val data = Literal(1.23f) + val avroTypeJson = + s""" + |{ + | "type": "double", + | "name": "my_double" + |} + """.stripMargin + + // When read float data as double, avro reader fails(trying to read 8 bytes while the data have + // only 4 bytes). + assertFail(data, avroTypeJson) + } + + test("read double as float") { + val data = Literal(1.23) + val avroTypeJson = + s""" + |{ + | "type": "float", + | "name": "my_float" + |} + """.stripMargin + + // avro reader reads the first 4 bytes of a double as a float, the result is totally undefined. + checkResult(data, avroTypeJson, 5.848603E35f) + } +} diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala new file mode 100644 index 0000000000000..90a4cd6ccf9dd --- /dev/null +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.functions.struct +import org.apache.spark.sql.test.SharedSQLContext + +class AvroFunctionsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("roundtrip in to_avro and from_avro - int and string") { + val df = spark.range(10).select('id, 'id.cast("string").as("str")) + + val avroDF = df.select(to_avro('id).as("a"), to_avro('str).as("b")) + val avroTypeLong = s""" + |{ + | "type": "int", + | "name": "id" + |} + """.stripMargin + val avroTypeStr = s""" + |{ + | "type": "string", + | "name": "str" + |} + """.stripMargin + checkAnswer(avroDF.select(from_avro('a, avroTypeLong), from_avro('b, avroTypeStr)), df) + } + + test("roundtrip in to_avro and from_avro - struct") { + val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct")) + val avroStructDF = df.select(to_avro('struct).as("avro")) + val avroTypeStruct = s""" + |{ + | "type": "record", + | "name": "struct", + | "fields": [ + | {"name": "col1", "type": "long"}, + | {"name": "col2", "type": "string"} + | ] + |} + """.stripMargin + checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df) + } + + test("roundtrip in to_avro and from_avro - array with null") { + val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array") + val avroTypeArrStruct = s""" + |[ { + | "type" : "array", + | "items" : [ { + | "type" : "record", + | "name" : "x", + | "fields" : [ { + | "name" : "y", + | "type" : "int" + | } ] + | }, "null" ] + |}, "null" ] + """.stripMargin + val readBackOne = dfOne.select(to_avro($"array").as("avro")) + .select(from_avro($"avro", avroTypeArrStruct).as("array")) + checkAnswer(dfOne, readBackOne) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index b4c8eab19c5cc..fa9c4001a68f6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -68,6 +68,12 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { java.util.Arrays.equals(result, expected) case (result: Double, expected: Spread[Double @unchecked]) => expected.asInstanceOf[Spread[Double]].isWithin(result) + case (result: InternalRow, expected: InternalRow) => + val st = dataType.asInstanceOf[StructType] + assert(result.numFields == st.length && expected.numFields == st.length) + st.zipWithIndex.forall { case (f, i) => + checkResult(result.get(i, f.dataType), expected.get(i, f.dataType), f.dataType) + } case (result: ArrayData, expected: ArrayData) => result.numElements == expected.numElements && { val et = dataType.asInstanceOf[ArrayType].elementType