-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Spark Master build #3928
base: master
Are you sure you want to change the base?
Fix Spark Master build #3928
Conversation
object DeltaThrowableHelperShims { | ||
/** | ||
* Handles a breaking change (SPARK-46810) between Spark 3.5 and Spark Master (4.0) where | ||
* `error-classes.json` was renamed to `error-conditions.json`. | ||
*/ | ||
val SPARK_ERROR_CLASS_SOURCE_FILE = "error/error-conditions.json" | ||
|
||
def showColumnsWithConflictDatabasesError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed because of apache/spark@53c1f31
@@ -72,8 +72,7 @@ import org.apache.spark.sql.types._ | |||
* A SQL parser that tries to parse Delta commands. If failing to parse the SQL text, it will | |||
* forward the call to `delegate`. | |||
*/ | |||
class DeltaSqlParser(val delegateSpark: ParserInterface) extends ParserInterfaceShims { | |||
private val delegate = ParserInterfaceShims(delegateSpark) | |||
class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed because of apache/spark@8791767
isStatsOptimizable(aggExprs) => Some(fileIndex) | ||
case _ => None | ||
def unapply(plan: Aggregate): Option[TahoeLogFileIndex] = { | ||
// GROUP BY is not supports. All AggregateExpression must be stats optimizable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed because of apache/spark@d6b7334
spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala
Outdated
Show resolved
Hide resolved
@@ -182,7 +182,7 @@ class DeltaJobStatisticsTracker( | |||
override def newTaskInstance(): WriteTaskStatsTracker = { | |||
val rootPath = new Path(rootUri) | |||
val hadoopConf = srlHadoopConf.value | |||
new DeltaTaskStatisticsTracker(dataCols, statsColExpr, rootPath, hadoopConf) | |||
new DeltaTaskStatisticsTracker(dataCols, prepareForEval(statsColExpr), rootPath, hadoopConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to_json
is RuntimeReplaceable
now. We need replace it before we try to execute it. This was the most narrow waist I could find.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For anyone who is interested. I tried to fix this by getting the expression from the optimized plan first. This for some bizarre reason did made test fail for Decimal columns. I found that the writer was writing min/max values with a rather large values, these would later on get dropped by the DataskippingReader because they would not fit in a Decimal(3,2).
constructor.newInstance( | ||
newIncrementalExecution, | ||
ExpressionEncoder(newIncrementalExecution.analyzed.schema)).asInstanceOf[DataFrame] | ||
DataFrameUtils.ofRows(newIncrementalExecution) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why the previous code was using reflection to do this.
@@ -227,7 +227,9 @@ class MergeIntoDVsSuite extends MergeIntoDVsTests { | |||
tableHasDVs = true, | |||
targetDf = sourceDF.as("s").join(targetDFWithMetadata.as("t"), condition), | |||
candidateFiles = corruptedFiles, | |||
condition = condition.expr | |||
condition = condition.expr, | |||
fileNameColumnOpt = Option(col("s._metadata.file_name")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To the reviewer. I think this is correct, but I am not 100% sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For context: both side of the join have a matching metadata field now. Using _metadata.file_name
or _metadata.row_index
will fail now with an AMBIGUOUS_REFERENCE exception.
val clonedSession = cloneMethod.invoke(spark).asInstanceOf[SparkSession] | ||
clonedSession | ||
} | ||
def cloneSession(spark: SparkSession): SparkSession = spark.cloneSession() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why we need these gymnastics. As long as the test is defined in org.apache.spark.. you can call SparkSession.cloneSession()
directly.
|
||
private def checkShowColumns(schema1: String, schema2: String, e: AnalysisException): Unit = { | ||
val expectedMessage = Seq( | ||
s"SHOW COLUMNS with conflicting databases: '$schema1' != '$schema2'", // SPARK-3.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to add shims for this. It is just an error message, that in both cases - IMO - is perfectly understandable.
@@ -46,7 +46,7 @@ trait DeltaHiveTest extends SparkFunSuite with BeforeAndAfterAll { self: DeltaSQ | |||
_sc = new SparkContext("local", this.getClass.getName, conf) | |||
_hiveContext = new TestHiveContext(_sc) | |||
_session = _hiveContext.sparkSession | |||
SparkSession.setActiveSession(_session) | |||
setActiveSession(_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of relocated here is a bit weird. We should be able to set the session.
DeltaErrors.multipleSourceRowMatchingTargetRowInMergeException(spark) | ||
assert(exceptionWithoutContext.getMessage.contains("https") === false) | ||
} | ||
val newSession = spark.newSession() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a new session so we don't have to clean-up.
class DeltaStructuredLoggingSuite extends DeltaStructuredLoggingSuiteBase { | ||
override def className: String = classOf[DeltaStructuredLoggingSuite].getSimpleName | ||
override def logFilePath: String = "target/structured.log" | ||
|
||
override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
Logging.enableStructuredLogging() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Structured logging is not enabled by default (anymore), so we have to enable it.
@@ -200,7 +200,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match { | |||
scalaVersion := scala213, | |||
crossScalaVersions := Seq(scala213), | |||
targetJvm := "17", | |||
resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed anymore. Removing it to avoid confusion.
ex2, | ||
"Missing field V2", | ||
"Couldn't resolve positional argument AFTER V2", | ||
"Renaming column is not supported in Hive-style ALTER COLUMN, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this is expected.
Which Delta project/connector is this regarding?
Description
This PR fixes the master build for Delta Spark.
The fixes were needed for a number of unrelated changes in Apache Spark:
I opted to do this in one PR because this is the only way I am sure tests pass.
How was this patch tested?
Existing tests.
Does this PR introduce any user-facing changes?
No.