Skip to content

Commit

Permalink
[Spark] Use encoderFor to copy encoders for DeltaUDF (delta-io#3562)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This Spark commit
apache/spark@67d2888
breaks Spark master compilation (specifically [this
diff](apache/spark@67d2888#diff-7852549cf1376f95414d804c8a9382a236179582cb4f03f836c284b0c1a81191L93-R99)).

## How was this patch tested?

Existing tests suffice.
  • Loading branch information
allisonport-db authored Aug 16, 2024
1 parent 18eb1a6 commit 09aa7a5
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.expressions.{SparkUserDefinedFunction, UserDefinedFunction}
import org.apache.spark.sql.functions.udf

Expand Down Expand Up @@ -81,8 +82,8 @@ object DeltaUDF {
orElse: => UserDefinedFunction): UserDefinedFunction = {
if (SparkSession.active.sessionState.conf
.getConf(DeltaSQLConf.INTERNAL_UDF_OPTIMIZATION_ENABLED)) {
val inputEncoders = template.inputEncoders.map(_.map(_.copy()))
val outputEncoder = template.outputEncoder.map(_.copy())
val inputEncoders = template.inputEncoders.map(_.map(e => encoderFor(e)))
val outputEncoder = template.outputEncoder.map(e => encoderFor(e))
template.copy(f = f, inputEncoders = inputEncoders, outputEncoder = outputEncoder)
} else {
orElse
Expand Down

0 comments on commit 09aa7a5

Please sign in to comment.