Skip to content

Commit

Permalink
[SPARK-50507][PYTHON][TESTS][FOLLOW-UP] Add refactored package into p…
Browse files Browse the repository at this point in the history
…ure Python test

### What changes were proposed in this pull request?

This PR is a followup of apache/spark#49074 that adds refactored package into pure Python test

### Why are the changes needed?

In order to fix the pure Python build https://github.com/apache/spark/actions/runs/12215379954/job/34077255570.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Will monitor the build.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49106 from HyukjinKwon/SPARK-50507-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
a0x8o committed Dec 9, 2024
1 parent 68cad07 commit df3e8cc
Show file tree
Hide file tree
Showing 23 changed files with 537 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.avro

import org.apache.avro.Schema

import org.apache.spark.sql.catalyst.util.{ParseMode, PermissiveMode}
import org.apache.spark.unsafe.types.UTF8String

object AvroExpressionEvalUtils {

def schemaOfAvro(
avroOptions: AvroOptions,
parseMode: ParseMode,
expectedSchema: Schema): UTF8String = {
val dt = SchemaConverters.toSqlType(
expectedSchema,
avroOptions.useStableIdForUnionType,
avroOptions.stableIdPrefixForUnionType,
avroOptions.recursiveFieldMaxDepth).dataType
val schema = parseMode match {
// With PermissiveMode, the output Catalyst row might contain columns of null values for
// corrupt records, even if some of the columns are not nullable in the user-provided schema.
// Therefore we force the schema to be all nullable here.
case PermissiveMode => dt.asNullable
case _ => dt
}
UTF8String.fromString(schema.sql)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.avro

import org.apache.avro.Schema

import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, Literal, RuntimeReplaceable}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, ObjectType}

private[sql] case class SchemaOfAvro(
jsonFormatSchema: String,
options: Map[String, String])
extends LeafExpression with RuntimeReplaceable {

override def dataType: DataType = SQLConf.get.defaultStringType

override def nullable: Boolean = false

@transient private lazy val avroOptions = AvroOptions(options)

@transient private lazy val actualSchema =
new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)

@transient private lazy val expectedSchema = avroOptions.schema.getOrElse(actualSchema)

@transient private lazy val parseMode: ParseMode = {
val mode = avroOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw QueryCompilationErrors.parseModeUnsupportedError(
prettyName, mode
)
}
mode
}

override def prettyName: String = "schema_of_avro"

@transient private lazy val avroOptionsObjectType = ObjectType(classOf[AvroOptions])
@transient private lazy val parseModeObjectType = ObjectType(classOf[ParseMode])
@transient private lazy val schemaObjectType = ObjectType(classOf[Schema])

override def replacement: Expression = StaticInvoke(
AvroExpressionEvalUtils.getClass,
dataType,
"schemaOfAvro",
Seq(
Literal(avroOptions, avroOptionsObjectType),
Literal(parseMode, parseModeObjectType),
Literal(expectedSchema, schemaObjectType)),
Seq(avroOptionsObjectType, parseModeObjectType, schemaObjectType)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,40 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
assert(readbackPerson2.get(2).toString === person2.get(2))
}
}

test("schema_of_avro") {
val df = spark.range(1)
val avroIntType = s"""
|{
| "type": "int",
| "name": "id"
|}""".stripMargin
checkAnswer(df.select(functions.schema_of_avro(avroIntType)), Row("INT"))

val avroStructType =
"""
|{
| "type": "record",
| "name": "person",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "age", "type": "int"},
| {"name": "country", "type": "string"}
| ]
|}""".stripMargin
checkAnswer(df.select(functions.schema_of_avro(avroStructType)),
Row("STRUCT<name: STRING NOT NULL, age: INT NOT NULL, country: STRING NOT NULL>"))

val avroMultiType =
"""
|{
| "type": "record",
| "name": "person",
| "fields": [
| {"name": "u", "type": ["int", "string"]}
| ]
|}""".stripMargin
checkAnswer(df.select(functions.schema_of_avro(avroMultiType)),
Row("STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>"))
}
}
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.{Date, Locale}
import scala.reflect.ClassTag

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.hdfs.BlockMissingException
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
Expand Down Expand Up @@ -319,6 +320,7 @@ class HadoopRDD[K, V](
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: BlockMissingException => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(log"Skipped the rest content in the corrupted file: " +
log"${MDC(PATH, split.inputSplit)}", e)
Expand All @@ -345,6 +347,7 @@ class HadoopRDD[K, V](
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: BlockMissingException => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(log"Skipped the rest content in the corrupted file: " +
log"${MDC(PATH, split.inputSplit)}", e)
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.hdfs.BlockMissingException
import org.apache.hadoop.io.Writable
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapred.JobConf
Expand Down Expand Up @@ -255,6 +256,7 @@ class NewHadoopRDD[K, V](
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: BlockMissingException => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
log"Skipped the rest content in the corrupted file: " +
Expand Down Expand Up @@ -284,6 +286,7 @@ class NewHadoopRDD[K, V](
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: BlockMissingException => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
log"Skipped the rest content in the corrupted file: " +
Expand Down
2 changes: 2 additions & 0 deletions docs/_data/menu-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
url: sql-ref-functions.html
- text: Identifiers
url: sql-ref-identifier.html
- text: IDENTIFIER clause
url: sql-ref-identifier-clause.html
- text: Literals
url: sql-ref-literals.html
- text: Null Semantics
Expand Down
4 changes: 2 additions & 2 deletions docs/sql-pipe-syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ Returns all the output rows from the source table unmodified.
For example:

```sql
CREATE TABLE t(a INT, b INT) AS VALUES (1, 2), (3, 4);
CREATE TABLE t AS VALUES (1, 2), (3, 4) AS t(a, b);
TABLE t;

+---+---+
Expand Down Expand Up @@ -207,7 +207,7 @@ provided. You may provide the window specification in the `WINDOW` clause.
For example:

```sql
CREATE TABLE t(col INT) AS VALUES (0), (1);
CREATE TABLE t AS VALUES (0), (1) AS t(col);

FROM t
|> SELECT col * 2 AS result;
Expand Down
1 change: 1 addition & 0 deletions docs/sql-ref.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Spark SQL is Apache Spark's module for working with structured data. This guide
* [Data Types](sql-ref-datatypes.html)
* [Datetime Pattern](sql-ref-datetime-pattern.html)
* [Number Pattern](sql-ref-number-pattern.html)
* [Operators](sql-ref-operators.html)
* [Functions](sql-ref-functions.html)
* [Built-in Functions](sql-ref-functions-builtin.html)
* [Scalar User-Defined Functions (UDFs)](sql-ref-functions-udf-scalar.html)
Expand Down
1 change: 1 addition & 0 deletions python/packaging/connect/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"pyspark.pandas.tests.connect.groupby",
"pyspark.pandas.tests.connect.indexes",
"pyspark.pandas.tests.connect.io",
"pyspark.pandas.tests.connect.pandas",
"pyspark.pandas.tests.connect.plot",
"pyspark.pandas.tests.connect.resample",
"pyspark.pandas.tests.connect.reshape",
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
default_conf = {
"spark.plugins": "org.apache.spark.sql.connect.SparkConnectPlugin",
"spark.sql.artifact.isolation.enabled": "true",
"spark.sql.artifact.isolation.always.apply.classloader": "true",
"spark.sql.artifact.isolation.alwaysApplyClassloader": "true",
}

if "SPARK_TESTING" in os.environ:
Expand Down
28 changes: 28 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,32 @@ object functions {
def to_avro(data: Column, jsonFormatSchema: String): Column = {
Column.fn("to_avro", data, lit(jsonFormatSchema))
}

/**
* Returns schema in the DDL format of the avro schema in JSON string format.
*
* @param jsonFormatSchema
* the avro schema in JSON string format.
*
* @since 4.0.0
*/
@Experimental
def schema_of_avro(jsonFormatSchema: String): Column = {
Column.fn("schema_of_avro", lit(jsonFormatSchema))
}

/**
* Returns schema in the DDL format of the avro schema in JSON string format.
*
* @param jsonFormatSchema
* the avro schema in JSON string format.
* @param options
* options to control how the Avro record is parsed.
*
* @since 4.0.0
*/
@Experimental
def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): Column = {
Column.fnWithOptions("schema_of_avro", options.asScala.iterator, lit(jsonFormatSchema))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ object FunctionRegistry {
// Avro
expression[FromAvro]("from_avro"),
expression[ToAvro]("to_avro"),
expression[SchemaOfAvro]("schema_of_avro"),

// Protobuf
expression[FromProtobuf]("from_protobuf"),
Expand Down
Loading

0 comments on commit df3e8cc

Please sign in to comment.