Skip to content

Commit

Permalink
Minor refactor to InvariantViolationException.scala
Browse files Browse the repository at this point in the history
GitOrigin-RevId: f6a4fe88f8fe6e2b1ce248a2bff0d655cac38a18
  • Loading branch information
maryannxue authored and tdas committed Jun 6, 2023
1 parent a655bed commit 8e8ef79
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import org.apache.spark.sql.delta.schema._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.DeltaSQLConf.DELTA_COLLECT_STATS_USING_TABLE_SCHEMA
import org.apache.spark.sql.delta.stats.{DeltaJobStatisticsTracker, StatisticsCollection}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
Expand Down Expand Up @@ -414,14 +412,9 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
++ statsTrackers,
options = options)
} catch {
case s: SparkException =>
case InnerInvariantViolationException(violationException) =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
val violationException = ExceptionUtils.getRootCause(s)
if (violationException.isInstanceOf[InvariantViolationException]) {
throw violationException
} else {
throw s
}
throw violationException
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,29 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.{DeltaThrowable, DeltaThrowableHelper}
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.commons.lang3.exception.ExceptionUtils

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute

/** Thrown when the given data doesn't match the rules defined on the table. */
case class InvariantViolationException(message: String) extends RuntimeException(message)

/**
* Match a [[SparkException]] and return the root cause Exception if it is a
* InvariantViolationException.
*/
object InnerInvariantViolationException {
def unapply(t: Throwable): Option[InvariantViolationException] = t match {
case s: SparkException =>
Option(ExceptionUtils.getRootCause(s)) match {
case Some(i: InvariantViolationException) => Some(i)
case _ => None
}
case _ => None
}
}

object DeltaInvariantViolationException {
def apply(constraint: Constraints.NotNull): DeltaInvariantViolationException = {
new DeltaInvariantViolationException(
Expand Down

0 comments on commit 8e8ef79

Please sign in to comment.