Skip to content

Commit

Permalink
[SPARK-36336][SQL] Add new exception of base exception used in QueryE…
Browse files Browse the repository at this point in the history
…xecutionErrors

### What changes were proposed in this pull request?
When we refactor the query execution errors to use error classes in QueryExecutionErrors, we need define some exception that mix SparkThrowable into a base Exception type.
according the example [SparkArithmeticException](https://github.com/apache/spark/blob/f90eb6a5db0778fd18b0b544f93eac3103bbf03b/core/src/main/scala/org/apache/spark/SparkException.scala#L75)

Add SparkXXXException as follows:
- `SparkClassNotFoundException`
- `SparkConcurrentModificationException`
- `SparkDateTimeException`
- `SparkFileAlreadyExistsException`
- `SparkFileNotFoundException`
- `SparkNoSuchMethodException`
- `SparkIndexOutOfBoundsException`
- `SparkIOException`
- `SparkSecurityException`
- `SparkSQLException`
- `SparkSQLFeatureNotSupportedException`

Refactor some exceptions in QueryExecutionErrors to use error classes and new exception for testing new exception

Some added by [PR](#33538) as follows:

- `SparkUnsupportedOperationException`
- `SparkIllegalStateException`
- `SparkNumberFormatException`
- `SparkIllegalArgumentException`
- `SparkArrayIndexOutOfBoundsException`
- `SparkNoSuchElementException`

### Why are the changes needed?
[SPARK-36336](https://issues.apache.org/jira/browse/SPARK-36336)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existed ut test

Closes #33573 from Peng-Lei/SPARK-36336.

Authored-by: PengLei <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
Peng-Lei authored and HyukjinKwon committed Aug 25, 2021
1 parent de932f5 commit 3e32ea1
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 24 deletions.
41 changes: 41 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"message" : [ "Field name %s is ambiguous and has %s matching fields in the struct." ],
"sqlState" : "42000"
},
"CONCURRENT_QUERY_ERROR" : {
"message" : [ "Another instance of this query was just started by a concurrent session." ]
},
"DIVIDE_BY_ZERO" : {
"message" : [ "divide by zero" ],
"sqlState" : "22012"
Expand All @@ -11,6 +14,13 @@
"message" : [ "Found duplicate keys '%s'" ],
"sqlState" : "23000"
},
"FAILED_RENAME_PATH" : {
"message" : [ "Failed to rename %s to %s as destination already exists" ],
"sqlState" : "22023"
},
"FAILED_SET_ORIGINAL_PERMISSION_BACK" : {
"message" : [ "Failed to set original permission %s back to the created path: %s. Exception: %s" ]
},
"GROUPING_COLUMN_MISMATCH" : {
"message" : [ "Column of grouping (%s) can't be found in grouping columns %s" ],
"sqlState" : "42000"
Expand All @@ -29,17 +39,32 @@
"message" : [ "Invalid pivot column '%s'. Pivot columns must be comparable." ],
"sqlState" : "42000"
},
"INCOMPATIBLE_DATASOURCE_REGISTER" : {
"message" : [ "Detected an incompatible DataSourceRegister. Please remove the incompatible library from classpath or upgrade it. Error: %s" ]
},
"INDEX_OUT_OF_BOUNDS" : {
"message" : [ "Index %s must be between 0 and the length of the ArrayData." ],
"sqlState" : "22023"
},
"INVALID_FIELD_NAME" : {
"message" : [ "Field name %s is invalid: %s is not a struct." ],
"sqlState" : "42000"
},
"INVALID_FRACTION_OF_SECOND" : {
"message" : [ "The fraction of sec must be zero. Valid range is [0, 60]." ],
"sqlState" : "22023"
},
"INVALID_JSON_SCHEMA_MAPTYPE" : {
"message" : [ "Input schema %s can only contain StringType as a key type for a MapType." ]
},
"MISSING_COLUMN" : {
"message" : [ "cannot resolve '%s' given input columns: [%s]" ],
"sqlState" : "42000"
},
"MISSING_METHOD" : {
"message" : [ "A method named \"%s\" is not declared in any enclosing class nor any supertype" ],
"sqlState" : "42000"
},
"MISSING_STATIC_PARTITION_COLUMN" : {
"message" : [ "Unknown static partition column: %s" ],
"sqlState" : "42000"
Expand All @@ -56,13 +81,29 @@
"message" : [ "Invalid pivot value '%s': value data type %s does not match pivot column data type %s" ],
"sqlState" : "42000"
},
"RENAME_SRC_PATH_NOT_FOUND" : {
"message" : [ "Failed to rename as %s was not found" ],
"sqlState" : "22023"
},
"SECOND_FUNCTION_ARGUMENT_NOT_INTEGER" : {
"message" : [ "The second argument of '%s' function needs to be an integer." ],
"sqlState" : "22023"
},
"UNABLE_TO_ACQUIRE_MEMORY" : {
"message" : [ "Unable to acquire %s bytes of memory, got %s" ]
},
"UNRECOGNIZED_SQL_TYPE" : {
"message" : [ "Unrecognized SQL type %s" ],
"sqlState" : "42000"
},
"UNSUPPORTED_LITERAL_TYPE" : {
"message" : [ "Unsupported literal type %s %s" ],
"sqlState" : "0A000"
},
"UNSUPPORTED_TRANSACTION_BY_JDBC_SERVER" : {
"message" : [ "The target JDBC server does not support transaction and can only support ALTER TABLE with a single action." ],
"sqlState" : "0A000"
},
"WRITING_JOB_ABORTED" : {
"message" : [ "Writing job aborted" ],
"sqlState" : "40000"
Expand Down
145 changes: 145 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.spark

import java.io.{FileNotFoundException, IOException}
import java.sql.{SQLException, SQLFeatureNotSupportedException}
import java.time.DateTimeException
import java.util.ConcurrentModificationException

import org.apache.hadoop.fs.FileAlreadyExistsException

class SparkException(
message: String,
cause: Throwable,
Expand Down Expand Up @@ -79,3 +86,141 @@ class SparkArithmeticException(errorClass: String, messageParameters: Array[Stri
override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Class not found exception thrown from Spark with an error class.
*/
class SparkClassNotFoundException(
errorClass: String,
messageParameters: Array[String],
cause: Throwable = null)
extends ClassNotFoundException(
SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Concurrent modification exception thrown from Spark with an error class.
*/
class SparkConcurrentModificationException(
errorClass: String,
messageParameters: Array[String],
cause: Throwable = null)
extends ConcurrentModificationException(
SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Datetime exception thrown from Spark with an error class.
*/
class SparkDateTimeException(errorClass: String, messageParameters: Array[String])
extends DateTimeException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Hadoop file already exists exception thrown from Spark with an error class.
*/
class SparkFileAlreadyExistsException(errorClass: String, messageParameters: Array[String])
extends FileAlreadyExistsException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* File not found exception thrown from Spark with an error class.
*/
class SparkFileNotFoundException(errorClass: String, messageParameters: Array[String])
extends FileNotFoundException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* No such method exception thrown from Spark with an error class.
*/
class SparkNoSuchMethodException(errorClass: String, messageParameters: Array[String])
extends NoSuchMethodException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Index out of bounds exception thrown from Spark with an error class.
*/
class SparkIndexOutOfBoundsException(errorClass: String, messageParameters: Array[String])
extends IndexOutOfBoundsException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* IO exception thrown from Spark with an error class.
*/
class SparkIOException(errorClass: String, messageParameters: Array[String])
extends IOException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

class SparkRuntimeException(
errorClass: String,
messageParameters: Array[String],
cause: Throwable = null)
extends RuntimeException(
SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Security exception thrown from Spark with an error class.
*/
class SparkSecurityException(errorClass: String, messageParameters: Array[String])
extends SecurityException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* SQL exception thrown from Spark with an error class.
*/
class SparkSQLException(errorClass: String, messageParameters: Array[String])
extends SQLException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* SQL feature not supported exception thrown from Spark with an error class.
*/
class SparkSQLFeatureNotSupportedException(errorClass: String, messageParameters: Array[String])
extends SQLFeatureNotSupportedException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.fs.permission.FsPermission
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.InternalCompilerException

import org.apache.spark.{Partition, SparkArithmeticException, SparkException, SparkUpgradeException}
import org.apache.spark.{Partition, SparkArithmeticException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIndexOutOfBoundsException, SparkNoSuchMethodException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkUpgradeException}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.memory.SparkOutOfMemoryError
Expand Down Expand Up @@ -157,7 +157,7 @@ object QueryExecutionErrors {
}

def invalidFractionOfSecondError(): DateTimeException = {
new DateTimeException("The fraction of sec must be zero. Valid range is [0, 60].")
new SparkDateTimeException(errorClass = "INVALID_FRACTION_OF_SECOND", Array.empty)
}

def overflowInSumOfDecimalError(): ArithmeticException = {
Expand All @@ -179,7 +179,8 @@ object QueryExecutionErrors {
}

def literalTypeUnsupportedError(v: Any): RuntimeException = {
new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
new SparkRuntimeException("UNSUPPORTED_LITERAL_TYPE",
Array(v.getClass.toString, v.toString))
}

def noDefaultForDataTypeError(dataType: DataType): RuntimeException = {
Expand Down Expand Up @@ -261,8 +262,7 @@ object QueryExecutionErrors {
}

def methodNotDeclaredError(name: String): Throwable = {
new NoSuchMethodException(s"""A method named "$name" is not declared """ +
"in any enclosing class nor any supertype")
new SparkNoSuchMethodException(errorClass = "MISSING_METHOD", Array(name))
}

def constructorNotFoundError(cls: String): Throwable = {
Expand Down Expand Up @@ -449,11 +449,7 @@ object QueryExecutionErrors {
}

def incompatibleDataSourceRegisterError(e: Throwable): Throwable = {
new ClassNotFoundException(
s"""
|Detected an incompatible DataSourceRegister. Please remove the incompatible
|library from classpath or upgrade it. Error: ${e.getMessage}
""".stripMargin, e)
new SparkClassNotFoundException("INCOMPATIBLE_DATASOURCE_REGISTER", Array(e.getMessage), e)
}

def unrecognizedFileFormatError(format: String): Throwable = {
Expand Down Expand Up @@ -675,7 +671,7 @@ object QueryExecutionErrors {
}

def unrecognizedSqlTypeError(sqlType: Int): Throwable = {
new SQLException(s"Unrecognized SQL type $sqlType")
new SparkSQLException(errorClass = "UNRECOGNIZED_SQL_TYPE", Array(sqlType.toString))
}

def unsupportedJdbcTypeError(content: String): Throwable = {
Expand All @@ -702,8 +698,8 @@ object QueryExecutionErrors {
}

def transactionUnsupportedByJdbcServerError(): Throwable = {
new SQLFeatureNotSupportedException("The target JDBC server does not support " +
"transaction and can only support ALTER TABLE with a single action.")
new SparkSQLFeatureNotSupportedException(errorClass = "UNSUPPORTED_TRANSACTION_BY_JDBC_SERVER",
Array.empty)
}

def dataTypeUnsupportedYetError(dataType: DataType): Throwable = {
Expand Down Expand Up @@ -952,8 +948,7 @@ object QueryExecutionErrors {
}

def concurrentQueryInstanceError(): Throwable = {
new ConcurrentModificationException(
"Another instance of this query was just started by a concurrent session.")
new SparkConcurrentModificationException("CONCURRENT_QUERY_ERROR", Array.empty)
}

def cannotParseJsonArraysAsStructsError(): Throwable = {
Expand Down Expand Up @@ -1233,8 +1228,7 @@ object QueryExecutionErrors {
}

def indexOutOfBoundsOfArrayDataError(idx: Int): Throwable = {
new IndexOutOfBoundsException(
s"Index $idx must be between 0 and the length of the ArrayData.")
new SparkIndexOutOfBoundsException(errorClass = "INDEX_OUT_OF_BOUNDS", Array(idx.toString))

This comment has been minimized.

Copy link
@MaxGekk

MaxGekk Sep 12, 2022

Member

@Peng-Lei @HyukjinKwon Any ideas how to trigger the error from user space? If it is not possible, let's replace it by an internal error.

This comment has been minimized.

Copy link
@MaxGekk

MaxGekk Sep 12, 2022

Member

I would like to convert this to an error class. Please, review the PR #37857

}

def malformedRecordsDetectedInRecordParsingError(e: BadRecordException): Throwable = {
Expand Down Expand Up @@ -1354,16 +1348,17 @@ object QueryExecutionErrors {
}

def renamePathAsExistsPathError(srcPath: Path, dstPath: Path): Throwable = {
new FileAlreadyExistsException(
s"Failed to rename $srcPath to $dstPath as destination already exists")
new SparkFileAlreadyExistsException(errorClass = "FAILED_RENAME_PATH",
Array(srcPath.toString, dstPath.toString))
}

def renameAsExistsPathError(dstPath: Path): Throwable = {
new FileAlreadyExistsException(s"Failed to rename as $dstPath already exists")
}

def renameSrcPathNotFoundError(srcPath: Path): Throwable = {
new FileNotFoundException(s"Failed to rename as $srcPath was not found")
new SparkFileNotFoundException(errorClass = "RENAME_SRC_PATH_NOT_FOUND",
Array(srcPath.toString))
}

def failedRenameTempFileError(srcPath: Path, dstPath: Path): Throwable = {
Expand Down Expand Up @@ -1560,8 +1555,8 @@ object QueryExecutionErrors {
permission: FsPermission,
path: Path,
e: Throwable): Throwable = {
new SecurityException(s"Failed to set original permission $permission back to " +
s"the created path: $path. Exception: ${e.getMessage}")
new SparkSecurityException(errorClass = "FAILED_SET_ORIGINAL_PERMISSION_BACK",
Array(permission.toString, path.toString, e.getMessage))
}

def failToSetOriginalACLBackError(aclEntries: String, path: Path, e: Throwable): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ SELECT make_timestamp(2021, 07, 11, 6, 30, 60.007)
-- !query schema
struct<>
-- !query output
java.time.DateTimeException
org.apache.spark.SparkDateTimeException
The fraction of sec must be zero. Valid range is [0, 60].


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ SELECT make_timestamp(2021, 07, 11, 6, 30, 60.007)
-- !query schema
struct<>
-- !query output
java.time.DateTimeException
org.apache.spark.SparkDateTimeException
The fraction of sec must be zero. Valid range is [0, 60].


Expand Down

0 comments on commit 3e32ea1

Please sign in to comment.