Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Spark Master build #3928

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open

Conversation

hvanhovell
Copy link
Contributor

@hvanhovell hvanhovell commented Dec 5, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR fixes the master build for Delta Spark.

The fixes were needed for a number of unrelated changes in Apache Spark:

  • Unified Scala SQL Interface
  • Aggregate adds a hint parameter in master.
  • TableSpec adds a collation parameter in master.
  • Condition and errorClass are swapped in the SparkThrowable framework.
  • ...

I opted to do this in one PR because this is the only way I am sure tests pass.

How was this patch tested?

Existing tests.

Does this PR introduce any user-facing changes?

No.

object DeltaThrowableHelperShims {
/**
* Handles a breaking change (SPARK-46810) between Spark 3.5 and Spark Master (4.0) where
* `error-classes.json` was renamed to `error-conditions.json`.
*/
val SPARK_ERROR_CLASS_SOURCE_FILE = "error/error-conditions.json"

def showColumnsWithConflictDatabasesError(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed because of apache/spark@53c1f31

@@ -72,8 +72,7 @@ import org.apache.spark.sql.types._
* A SQL parser that tries to parse Delta commands. If failing to parse the SQL text, it will
* forward the call to `delegate`.
*/
class DeltaSqlParser(val delegateSpark: ParserInterface) extends ParserInterfaceShims {
private val delegate = ParserInterfaceShims(delegateSpark)
class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed because of apache/spark@8791767

isStatsOptimizable(aggExprs) => Some(fileIndex)
case _ => None
def unapply(plan: Aggregate): Option[TahoeLogFileIndex] = {
// GROUP BY is not supports. All AggregateExpression must be stats optimizable.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed because of apache/spark@d6b7334

@@ -182,7 +182,7 @@ class DeltaJobStatisticsTracker(
override def newTaskInstance(): WriteTaskStatsTracker = {
val rootPath = new Path(rootUri)
val hadoopConf = srlHadoopConf.value
new DeltaTaskStatisticsTracker(dataCols, statsColExpr, rootPath, hadoopConf)
new DeltaTaskStatisticsTracker(dataCols, prepareForEval(statsColExpr), rootPath, hadoopConf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_json is RuntimeReplaceable now. We need replace it before we try to execute it. This was the most narrow waist I could find.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For anyone who is interested. I tried to fix this by getting the expression from the optimized plan first. This for some bizarre reason did made test fail for Decimal columns. I found that the writer was writing min/max values with a rather large values, these would later on get dropped by the DataskippingReader because they would not fit in a Decimal(3,2).

constructor.newInstance(
newIncrementalExecution,
ExpressionEncoder(newIncrementalExecution.analyzed.schema)).asInstanceOf[DataFrame]
DataFrameUtils.ofRows(newIncrementalExecution)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why the previous code was using reflection to do this.

@@ -227,7 +227,9 @@ class MergeIntoDVsSuite extends MergeIntoDVsTests {
tableHasDVs = true,
targetDf = sourceDF.as("s").join(targetDFWithMetadata.as("t"), condition),
candidateFiles = corruptedFiles,
condition = condition.expr
condition = condition.expr,
fileNameColumnOpt = Option(col("s._metadata.file_name")),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the reviewer. I think this is correct, but I am not 100% sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context: both side of the join have a matching metadata field now. Using _metadata.file_name or _metadata.row_index will fail now with an AMBIGUOUS_REFERENCE exception.

val clonedSession = cloneMethod.invoke(spark).asInstanceOf[SparkSession]
clonedSession
}
def cloneSession(spark: SparkSession): SparkSession = spark.cloneSession()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we need these gymnastics. As long as the test is defined in org.apache.spark.. you can call SparkSession.cloneSession() directly.


private def checkShowColumns(schema1: String, schema2: String, e: AnalysisException): Unit = {
val expectedMessage = Seq(
s"SHOW COLUMNS with conflicting databases: '$schema1' != '$schema2'", // SPARK-3.5
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to add shims for this. It is just an error message, that in both cases - IMO - is perfectly understandable.

@@ -46,7 +46,7 @@ trait DeltaHiveTest extends SparkFunSuite with BeforeAndAfterAll { self: DeltaSQ
_sc = new SparkContext("local", this.getClass.getName, conf)
_hiveContext = new TestHiveContext(_sc)
_session = _hiveContext.sparkSession
SparkSession.setActiveSession(_session)
setActiveSession(_session)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of relocated here is a bit weird. We should be able to set the session.

DeltaErrors.multipleSourceRowMatchingTargetRowInMergeException(spark)
assert(exceptionWithoutContext.getMessage.contains("https") === false)
}
val newSession = spark.newSession()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a new session so we don't have to clean-up.

class DeltaStructuredLoggingSuite extends DeltaStructuredLoggingSuiteBase {
override def className: String = classOf[DeltaStructuredLoggingSuite].getSimpleName
override def logFilePath: String = "target/structured.log"

override def beforeAll(): Unit = {
super.beforeAll()
Logging.enableStructuredLogging()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Structured logging is not enabled by default (anymore), so we have to enable it.

@@ -200,7 +200,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
scalaVersion := scala213,
crossScalaVersions := Seq(scala213),
targetJvm := "17",
resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed anymore. Removing it to avoid confusion.

ex2,
"Missing field V2",
"Couldn't resolve positional argument AFTER V2",
"Renaming column is not supported in Hive-style ALTER COLUMN, " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is expected.

@raveeram-db raveeram-db requested a review from scottsand-db March 3, 2025 17:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants