Skip to content

Commit

Permalink
Fix reflection/to_json
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed Feb 14, 2025
1 parent 312eb6a commit fbaa773
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 18 deletions.
7 changes: 6 additions & 1 deletion spark/src/main/scala-spark-3.5/shims/DataFrameShims.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
*/
package org.apache.spark.sql.delta

import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

object DataFrameUtils {
def ofRows(spark: SparkSession, plan: LogicalPlan): DataFrame = Dataset.ofRows(spark, plan)
def ofRows(queryExecution: QueryExecution): DataFrame = {
val ds = new Dataset(queryExecution, Encoders.row(queryExecution.analyzed.schema))
ds.asInstanceOf[DataFrame]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.{Dataset => DatasetImpl, SparkSession => SparkSessionImpl, DataFrameWriter => DataFrameWriterImpl}
import org.apache.spark.sql.{SparkSession => SparkSessionImpl, DataFrameWriter => DataFrameWriterImpl}

object Relocated {
type Dataset[E] = DatasetImpl[E]
type SparkSession = SparkSessionImpl
def setActiveSession(session: SparkSession): Unit = SparkSessionImpl.setActiveSession(session)
val dataFrameWriterClassName = classOf[DataFrameWriterImpl[_]].getCanonicalName
Expand Down
7 changes: 6 additions & 1 deletion spark/src/main/scala-spark-master/shims/DataFrameShims.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package org.apache.spark.sql.delta

import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.classic.Dataset
import org.apache.spark.sql.execution.QueryExecution

object DataFrameUtils {
def ofRows(spark: SparkSession, plan: LogicalPlan): DataFrame = Dataset.ofRows(spark, plan)
def ofRows(queryExecution: QueryExecution): DataFrame = {
val ds = new Dataset(queryExecution, Encoders.row(queryExecution.analyzed.schema))
ds.asInstanceOf[DataFrame]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.classic.{Dataset => DatasetImpl, SparkSession => SparkSessionImpl, DataFrameWriter => DataFrameWriterImpl}
import org.apache.spark.sql.classic.{SparkSession => SparkSessionImpl, DataFrameWriter => DataFrameWriterImpl}

object Relocated {
type Dataset[E] = DatasetImpl[E]
type SparkSession = SparkSessionImpl
def setActiveSession(session: SparkSession): Unit = SparkSessionImpl.setActiveSession(session)
val dataFrameWriterClassName = classOf[DataFrameWriterImpl[_]].getCanonicalName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object IdentityColumn extends DeltaLogging {
// Resolve the collection expression by constructing a query to select the expression from a
// table with the statsSchema and get the analyzed expression.
val resolvedPlan = DataFrameUtils.ofRows(spark, LocalRelation(statsDataSchema))
.select(unresolvedExpr).queryExecution.analyzed
.select(unresolvedExpr).queryExecution.optimizedPlan

// We have to use the new attributes with regenerated attribute IDs, because the Analyzer
// doesn't guarantee that attributes IDs will stay the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import scala.collection.mutable

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.delta.Relocated.Dataset
import org.apache.spark.sql.delta.DataFrameUtils

import org.apache.spark.sql.{Column, DataFrame, Encoder}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, IncrementalExecutionShims, StreamExecution}

Expand All @@ -45,13 +44,6 @@ object DeltaStreamUtils {
newMicroBatch.queryExecution.logical,
incrementalExecution)
newIncrementalExecution.executedPlan // Force the lazy generation of execution plan


// Use reflection to call the private constructor.
val constructor =
classOf[Dataset[_]].getConstructor(classOf[QueryExecution], classOf[Encoder[_]])
constructor.newInstance(
newIncrementalExecution,
ExpressionEncoder(newIncrementalExecution.analyzed.schema)).asInstanceOf[DataFrame]
DataFrameUtils.ofRows(newIncrementalExecution)
}
}

0 comments on commit fbaa773

Please sign in to comment.