diff --git a/external/avro/pom.xml b/external/avro/pom.xml
index 42e865bc38824..ad7df1f49ac45 100644
--- a/external/avro/pom.xml
+++ b/external/avro/pom.xml
@@ -61,6 +61,11 @@
test-jar
test
+
+ org.scalacheck
+ scalacheck_${scala.binary.version}
+ test
+
org.apache.spark
spark-tags_${scala.binary.version}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
new file mode 100644
index 0000000000000..6671b3fb8705c
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericDatumReader
+import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
+
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
+
+case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
+ extends UnaryExpression with ExpectsInputTypes {
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+ override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType
+
+ override def nullable: Boolean = true
+
+ @transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
+
+ @transient private lazy val reader = new GenericDatumReader[Any](avroSchema)
+
+ @transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType)
+
+ @transient private var decoder: BinaryDecoder = _
+
+ @transient private var result: Any = _
+
+ override def nullSafeEval(input: Any): Any = {
+ val binary = input.asInstanceOf[Array[Byte]]
+ decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder)
+ result = reader.read(result, decoder)
+ deserializer.deserialize(result)
+ }
+
+ override def simpleString: String = {
+ s"from_avro(${child.sql}, ${dataType.simpleString})"
+ }
+
+ override def sql: String = {
+ s"from_avro(${child.sql}, ${dataType.catalogString})"
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ val expr = ctx.addReferenceObj("this", this)
+ defineCodeGen(ctx, ev, input =>
+ s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)")
+ }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
new file mode 100644
index 0000000000000..a669388e88258
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+
+import org.apache.avro.generic.GenericDatumWriter
+import org.apache.avro.io.{BinaryEncoder, EncoderFactory}
+
+import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
+import org.apache.spark.sql.types.{BinaryType, DataType}
+
+case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
+
+ override def dataType: DataType = BinaryType
+
+ @transient private lazy val avroType =
+ SchemaConverters.toAvroType(child.dataType, child.nullable)
+
+ @transient private lazy val serializer =
+ new AvroSerializer(child.dataType, avroType, child.nullable)
+
+ @transient private lazy val writer =
+ new GenericDatumWriter[Any](avroType)
+
+ @transient private var encoder: BinaryEncoder = _
+
+ @transient private lazy val out = new ByteArrayOutputStream
+
+ override def nullSafeEval(input: Any): Any = {
+ out.reset()
+ encoder = EncoderFactory.get().directBinaryEncoder(out, encoder)
+ val avroData = serializer.serialize(input)
+ writer.write(avroData, encoder)
+ encoder.flush()
+ out.toByteArray
+ }
+
+ override def simpleString: String = {
+ s"to_avro(${child.sql}, ${child.dataType.simpleString})"
+ }
+
+ override def sql: String = {
+ s"to_avro(${child.sql}, ${child.dataType.catalogString})"
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ val expr = ctx.addReferenceObj("this", this)
+ defineCodeGen(ctx, ev, input =>
+ s"(byte[]) $expr.nullSafeEval($input)")
+ }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
index b3c8a669cf820..e82651d96a03d 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql
+import org.apache.avro.Schema
+
+import org.apache.spark.annotation.Experimental
+
package object avro {
/**
* Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
@@ -36,4 +40,31 @@ package object avro {
@scala.annotation.varargs
def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*)
}
+
+ /**
+ * Converts a binary column of avro format into its corresponding catalyst value. The specified
+ * schema must match the read data, otherwise the behavior is undefined: it may fail or return
+ * arbitrary result.
+ *
+ * @param data the binary column.
+ * @param jsonFormatSchema the avro schema in JSON string format.
+ *
+ * @since 2.4.0
+ */
+ @Experimental
+ def from_avro(data: Column, jsonFormatSchema: String): Column = {
+ new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema))
+ }
+
+ /**
+ * Converts a column into binary of avro format.
+ *
+ * @param data the data column.
+ *
+ * @since 2.4.0
+ */
+ @Experimental
+ def to_avro(data: Column): Column = {
+ new Column(CatalystDataToAvro(data.expr))
+ }
}
diff --git a/external/avro/src/test/resources/log4j.properties b/external/avro/src/test/resources/log4j.properties
index f80a5291bc078..75e3b53a093f6 100644
--- a/external/avro/src/test/resources/log4j.properties
+++ b/external/avro/src/test/resources/log4j.properties
@@ -15,35 +15,14 @@
# limitations under the License.
#
-# Set everything to be logged to the file core/target/unit-tests.log
-log4j.rootLogger=DEBUG, CA, FA
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-#Console Appender
-log4j.appender.CA=org.apache.log4j.ConsoleAppender
-log4j.appender.CA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
-log4j.appender.CA.Threshold = WARN
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
-
-#File Appender
-log4j.appender.FA=org.apache.log4j.FileAppender
-log4j.appender.FA.append=false
-log4j.appender.FA.file=target/unit-tests.log
-log4j.appender.FA.layout=org.apache.log4j.PatternLayout
-log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Set the logger level of File Appender to WARN
-log4j.appender.FA.Threshold = INFO
-
-# Some packages are noisy for no good reason.
-log4j.additivity.parquet.hadoop.ParquetRecordReader=false
-log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
-
-log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
-log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
-
-log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
-log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
-
-log4j.additivity.hive.ql.metadata.Hive=false
-log4j.logger.hive.ql.metadata.Hive=OFF
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
new file mode 100644
index 0000000000000..06d5477b2ea45
--- /dev/null
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.avro.Schema
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{AvroDataToCatalyst, CatalystDataToAvro, RandomDataGenerator}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalHelper {
+
+ private def roundTripTest(data: Literal): Unit = {
+ val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable)
+ checkResult(data, avroType.toString, data.eval())
+ }
+
+ private def checkResult(data: Literal, schema: String, expected: Any): Unit = {
+ checkEvaluation(
+ AvroDataToCatalyst(CatalystDataToAvro(data), schema),
+ prepareExpectedResult(expected))
+ }
+
+ private def assertFail(data: Literal, schema: String): Unit = {
+ intercept[java.io.EOFException] {
+ AvroDataToCatalyst(CatalystDataToAvro(data), schema).eval()
+ }
+ }
+
+ private val testingTypes = Seq(
+ BooleanType,
+ ByteType,
+ ShortType,
+ IntegerType,
+ LongType,
+ FloatType,
+ DoubleType,
+ DecimalType(8, 0), // 32 bits decimal without fraction
+ DecimalType(8, 4), // 32 bits decimal
+ DecimalType(16, 0), // 64 bits decimal without fraction
+ DecimalType(16, 11), // 64 bits decimal
+ DecimalType(38, 0),
+ DecimalType(38, 38),
+ StringType,
+ BinaryType)
+
+ protected def prepareExpectedResult(expected: Any): Any = expected match {
+ // Spark decimal is converted to avro string=
+ case d: Decimal => UTF8String.fromString(d.toString)
+ // Spark byte and short both map to avro int
+ case b: Byte => b.toInt
+ case s: Short => s.toInt
+ case row: GenericInternalRow => InternalRow.fromSeq(row.values.map(prepareExpectedResult))
+ case array: GenericArrayData => new GenericArrayData(array.array.map(prepareExpectedResult))
+ case map: MapData =>
+ val keys = new GenericArrayData(
+ map.keyArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult))
+ val values = new GenericArrayData(
+ map.valueArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult))
+ new ArrayBasedMapData(keys, values)
+ case other => other
+ }
+
+ testingTypes.foreach { dt =>
+ val seed = scala.util.Random.nextLong()
+ test(s"single $dt with seed $seed") {
+ val rand = new scala.util.Random(seed)
+ val data = RandomDataGenerator.forType(dt, rand = rand).get.apply()
+ val converter = CatalystTypeConverters.createToCatalystConverter(dt)
+ val input = Literal.create(converter(data), dt)
+ roundTripTest(input)
+ }
+ }
+
+ for (_ <- 1 to 5) {
+ val seed = scala.util.Random.nextLong()
+ val rand = new scala.util.Random(seed)
+ val schema = RandomDataGenerator.randomSchema(rand, 5, testingTypes)
+ test(s"flat schema ${schema.catalogString} with seed $seed") {
+ val data = RandomDataGenerator.randomRow(rand, schema)
+ val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+ val input = Literal.create(converter(data), schema)
+ roundTripTest(input)
+ }
+ }
+
+ for (_ <- 1 to 5) {
+ val seed = scala.util.Random.nextLong()
+ val rand = new scala.util.Random(seed)
+ val schema = RandomDataGenerator.randomNestedSchema(rand, 10, testingTypes)
+ test(s"nested schema ${schema.catalogString} with seed $seed") {
+ val data = RandomDataGenerator.randomRow(rand, schema)
+ val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+ val input = Literal.create(converter(data), schema)
+ roundTripTest(input)
+ }
+ }
+
+ test("read int as string") {
+ val data = Literal(1)
+ val avroTypeJson =
+ s"""
+ |{
+ | "type": "string",
+ | "name": "my_string"
+ |}
+ """.stripMargin
+
+ // When read int as string, avro reader is not able to parse the binary and fail.
+ assertFail(data, avroTypeJson)
+ }
+
+ test("read string as int") {
+ val data = Literal("abc")
+ val avroTypeJson =
+ s"""
+ |{
+ | "type": "int",
+ | "name": "my_int"
+ |}
+ """.stripMargin
+
+ // When read string data as int, avro reader is not able to find the type mismatch and read
+ // the string length as int value.
+ checkResult(data, avroTypeJson, 3)
+ }
+
+ test("read float as double") {
+ val data = Literal(1.23f)
+ val avroTypeJson =
+ s"""
+ |{
+ | "type": "double",
+ | "name": "my_double"
+ |}
+ """.stripMargin
+
+ // When read float data as double, avro reader fails(trying to read 8 bytes while the data have
+ // only 4 bytes).
+ assertFail(data, avroTypeJson)
+ }
+
+ test("read double as float") {
+ val data = Literal(1.23)
+ val avroTypeJson =
+ s"""
+ |{
+ | "type": "float",
+ | "name": "my_float"
+ |}
+ """.stripMargin
+
+ // avro reader reads the first 4 bytes of a double as a float, the result is totally undefined.
+ checkResult(data, avroTypeJson, 5.848603E35f)
+ }
+}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
new file mode 100644
index 0000000000000..90a4cd6ccf9dd
--- /dev/null
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.avro.Schema
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.functions.struct
+import org.apache.spark.sql.test.SharedSQLContext
+
+class AvroFunctionsSuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
+
+ test("roundtrip in to_avro and from_avro - int and string") {
+ val df = spark.range(10).select('id, 'id.cast("string").as("str"))
+
+ val avroDF = df.select(to_avro('id).as("a"), to_avro('str).as("b"))
+ val avroTypeLong = s"""
+ |{
+ | "type": "int",
+ | "name": "id"
+ |}
+ """.stripMargin
+ val avroTypeStr = s"""
+ |{
+ | "type": "string",
+ | "name": "str"
+ |}
+ """.stripMargin
+ checkAnswer(avroDF.select(from_avro('a, avroTypeLong), from_avro('b, avroTypeStr)), df)
+ }
+
+ test("roundtrip in to_avro and from_avro - struct") {
+ val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
+ val avroStructDF = df.select(to_avro('struct).as("avro"))
+ val avroTypeStruct = s"""
+ |{
+ | "type": "record",
+ | "name": "struct",
+ | "fields": [
+ | {"name": "col1", "type": "long"},
+ | {"name": "col2", "type": "string"}
+ | ]
+ |}
+ """.stripMargin
+ checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df)
+ }
+
+ test("roundtrip in to_avro and from_avro - array with null") {
+ val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array")
+ val avroTypeArrStruct = s"""
+ |[ {
+ | "type" : "array",
+ | "items" : [ {
+ | "type" : "record",
+ | "name" : "x",
+ | "fields" : [ {
+ | "name" : "y",
+ | "type" : "int"
+ | } ]
+ | }, "null" ]
+ |}, "null" ]
+ """.stripMargin
+ val readBackOne = dfOne.select(to_avro($"array").as("avro"))
+ .select(from_avro($"avro", avroTypeArrStruct).as("array"))
+ checkAnswer(dfOne, readBackOne)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 14bfa212b5496..d045267ef5d9e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -79,6 +79,12 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
java.util.Arrays.equals(result, expected)
case (result: Double, expected: Spread[Double @unchecked]) =>
expected.asInstanceOf[Spread[Double]].isWithin(result)
+ case (result: InternalRow, expected: InternalRow) =>
+ val st = dataType.asInstanceOf[StructType]
+ assert(result.numFields == st.length && expected.numFields == st.length)
+ st.zipWithIndex.forall { case (f, i) =>
+ checkResult(result.get(i, f.dataType), expected.get(i, f.dataType), f.dataType)
+ }
case (result: ArrayData, expected: ArrayData) =>
result.numElements == expected.numElements && {
val et = dataType.asInstanceOf[ArrayType].elementType