Skip to content

Commit

Permalink
[SPARK-23372][SQL] Writing empty struct in parquet fails during execu…
Browse files Browse the repository at this point in the history
…tion. It should fail earlier in the processing.

## What changes were proposed in this pull request?
Currently we allow writing data frames with empty schema into a file based datasource for certain file formats such as JSON, ORC etc. For formats such as Parquet and Text, we raise error at different times of execution. For text format, we return error from the driver early on in processing where as for format such as parquet, the error is raised from executor.

**Example**
spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path)
**Results in**
``` SQL
org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message spark_schema {
 }

at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
 at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
 at org.apache.parquet.schema.MessageType.accept(MessageType.java:58)
 at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
 at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:225)
 at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
 at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
 at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
 at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:376)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:387)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:278)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:276)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:281)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:206)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:205)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:109)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.
```

In this PR, we unify the error processing and raise error on attempt to write empty schema based dataframes into file based datasource (orc, parquet, text , csv, json etc) early on in the processing.

## How was this patch tested?

Unit tests added in FileBasedDatasourceSuite.

Author: Dilip Biswal <[email protected]>

Closes #20579 from dilipbiswal/spark-23372.
  • Loading branch information
dilipbiswal authored and cloud-fan committed Mar 22, 2018
1 parent 95e51ff commit 5c9eaa6
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1807,6 +1807,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.

## Upgrading From Spark SQL 2.2 to 2.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -546,6 +546,7 @@ case class DataSource(
case dataSource: CreatableRelationProvider =>
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
case format: FileFormat =>
DataSource.validateSchema(data.schema)
planForWritingFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
Expand Down Expand Up @@ -719,4 +720,27 @@ object DataSource extends Logging {
}
globPath
}

/**
* Called before writing into a FileFormat based data source to make sure the
* supplied schema is not empty.
* @param schema
*/
private def validateSchema(schema: StructType): Unit = {
def hasEmptySchema(schema: StructType): Boolean = {
schema.size == 0 || schema.find {
case StructField(_, b: StructType, _, _) => hasEmptySchema(b)
case _ => false
}.isDefined
}


if (hasEmptySchema(schema)) {
throw new AnalysisException(
s"""
|Datasource does not support writing empty or nested empty schemas.
|Please make sure the data schema has at least one or more column(s).
""".stripMargin)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._


class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll {
Expand Down Expand Up @@ -107,6 +108,33 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}

allFileBasedDataSources.foreach { format =>
test(s"SPARK-23372 error while writing empty schema files using $format") {
withTempPath { outputPath =>
val errMsg = intercept[AnalysisException] {
spark.emptyDataFrame.write.format(format).save(outputPath.toString)
}
assert(errMsg.getMessage.contains(
"Datasource does not support writing empty or nested empty schemas"))
}

// Nested empty schema
withTempPath { outputPath =>
val schema = StructType(Seq(
StructField("a", IntegerType),
StructField("b", StructType(Nil)),
StructField("c", IntegerType)
))
val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
val errMsg = intercept[AnalysisException] {
df.write.format(format).save(outputPath.toString)
}
assert(errMsg.getMessage.contains(
"Datasource does not support writing empty or nested empty schemas"))
}
}
}

allFileBasedDataSources.foreach { format =>
test(s"SPARK-22146 read files containing special characters using $format") {
withTempDir { dir =>
Expand Down

0 comments on commit 5c9eaa6

Please sign in to comment.