Skip to content

Commit

Permalink
[SPARK-24811][SQL] Avro: add new function from_avro and to_avro
Browse files Browse the repository at this point in the history
1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

2. Add a new function to_avro for converting a column into binary of avro format with the specified schema.

I created apache#21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/

Additional changes In this PR:
1. Add `scalacheck` dependency in pom.xml to resolve the failure.
2. Update the `log4j.properties` to make it consistent with other modules.

Unit test
Compile with different commands:
```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
```

Author: Gengliang Wang <[email protected]>

Closes apache#21838 from gengliangwang/from_and_to_avro.

(cherry picked from commit 8817c68)
  • Loading branch information
gengliangwang authored and Yuxiang Chen committed Dec 18, 2018
1 parent 66f45bf commit d6a633f
Show file tree
Hide file tree
Showing 8 changed files with 447 additions and 32 deletions.
5 changes: 5 additions & 0 deletions external/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.io.{BinaryDecoder, DecoderFactory}

import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}

case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
extends UnaryExpression with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)

override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType

override def nullable: Boolean = true

@transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema)

@transient private lazy val reader = new GenericDatumReader[Any](avroSchema)

@transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType)

@transient private var decoder: BinaryDecoder = _

@transient private var result: Any = _

override def nullSafeEval(input: Any): Any = {
val binary = input.asInstanceOf[Array[Byte]]
decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder)
result = reader.read(result, decoder)
deserializer.deserialize(result)
}

override def simpleString: String = {
s"from_avro(${child.sql}, ${dataType.simpleString})"
}

override def sql: String = {
s"from_avro(${child.sql}, ${dataType.catalogString})"
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val expr = ctx.addReferenceObj("this", this)
defineCodeGen(ctx, ev, input =>
s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.io.ByteArrayOutputStream

import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.io.{BinaryEncoder, EncoderFactory}

import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.types.{BinaryType, DataType}

case class CatalystDataToAvro(child: Expression) extends UnaryExpression {

override def dataType: DataType = BinaryType

@transient private lazy val avroType =
SchemaConverters.toAvroType(child.dataType, child.nullable)

@transient private lazy val serializer =
new AvroSerializer(child.dataType, avroType, child.nullable)

@transient private lazy val writer =
new GenericDatumWriter[Any](avroType)

@transient private var encoder: BinaryEncoder = _

@transient private lazy val out = new ByteArrayOutputStream

override def nullSafeEval(input: Any): Any = {
out.reset()
encoder = EncoderFactory.get().directBinaryEncoder(out, encoder)
val avroData = serializer.serialize(input)
writer.write(avroData, encoder)
encoder.flush()
out.toByteArray
}

override def simpleString: String = {
s"to_avro(${child.sql}, ${child.dataType.simpleString})"
}

override def sql: String = {
s"to_avro(${child.sql}, ${child.dataType.catalogString})"
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val expr = ctx.addReferenceObj("this", this)
defineCodeGen(ctx, ev, input =>
s"(byte[]) $expr.nullSafeEval($input)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql

import org.apache.avro.Schema

import org.apache.spark.annotation.Experimental

package object avro {
/**
* Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
Expand All @@ -36,4 +40,31 @@ package object avro {
@scala.annotation.varargs
def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*)
}

/**
* Converts a binary column of avro format into its corresponding catalyst value. The specified
* schema must match the read data, otherwise the behavior is undefined: it may fail or return
* arbitrary result.
*
* @param data the binary column.
* @param jsonFormatSchema the avro schema in JSON string format.
*
* @since 2.4.0
*/
@Experimental
def from_avro(data: Column, jsonFormatSchema: String): Column = {
new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema))
}

/**
* Converts a column into binary of avro format.
*
* @param data the data column.
*
* @since 2.4.0
*/
@Experimental
def to_avro(data: Column): Column = {
new Column(CatalystDataToAvro(data.expr))
}
}
42 changes: 10 additions & 32 deletions external/avro/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,13 @@
# limitations under the License.
#

# Set everything to be logged to the file core/target/unit-tests.log
log4j.rootLogger=DEBUG, CA, FA

#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = WARN


#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = INFO

# Some packages are noisy for no good reason.
log4j.additivity.parquet.hadoop.ParquetRecordReader=false
log4j.logger.parquet.hadoop.ParquetRecordReader=OFF

log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF

log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF

log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF
# Set everything to be logged to the file target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
Loading

0 comments on commit d6a633f

Please sign in to comment.