-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39313][SQL] toCatalystOrdering
should fail if V2Expression can not be translated
#36697
Conversation
8a560e8
to
35e9bdb
Compare
cc @cloud-fan |
val exc = intercept[AnalysisException] { | ||
V2ExpressionUtils.toCatalystOrdering( | ||
Array(supportedV2Sort, unsupportedV2Sort), | ||
LocalRelation.apply(AttributeReference("a", StringType)())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current master/3.3 return Seq.empty
instead of throwing AnalysisException
My change breaks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @pan3793 .
@@ -40,7 +40,7 @@ object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper { | |||
|
|||
val catalystPartitioning = scan.outputPartitioning() match { | |||
case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map( | |||
V2ExpressionUtils.toCatalyst(_, relation, funCatalogOpt))) | |||
V2ExpressionUtils.toCatalyst(_, relation, false, funCatalogOpt))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, shall we use the following style, @pan3793 ?
- V2ExpressionUtils.toCatalyst(_, relation, false, funCatalogOpt)))
+ V2ExpressionUtils.toCatalyst(_, relation, strict = false, funCatalogOpt)))
import org.apache.spark.sql.connector.expressions._ | ||
import org.apache.spark.sql.types.StringType | ||
|
||
class V2ExpressionUtilsSuite extends SparkFunSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V2ExpressionUtils
is package org.apache.spark.sql.catalyst.expressions
while this V2ExpressionUtilsSuite
is in package org.apache.spark.sql.execution.datasources.v2
.
If this test suite depends on another stuffs (outside org.apache.spark.sql.catalyst.expressions
package), we had better avoid creating a new V2ExpressionUtilsSuite
here. If you don't mind, shall we move this test case into another suite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, the current changes are not the final solution, I will make the following changes as suggested.
toCatalystOrdering
should fail if V2Expression can not be translated
Thanks to @sunchao for the confirmation, since I'm trying to get V2Write to support V2Function, but it seems more complicated than I thought at first. Would you please give me some suggestions? |
Sure. Let me check. |
If so, I will fail RC3. |
@pan3793 this commit should help to fix the tests. |
Thanks, seems my approach is too overkill to fix it |
I think that can be a separate PR for master only to support function catalog on the write path. |
Thanks @sunchao and @dongjoon-hyun, changed as suggested. |
The change breaks "SPARK-30289 Create: partitioned by nested column" |
@pan3793 hmm how? that change is test only and I don't see any test failure in the latest CI run |
In |
@@ -32,15 +32,15 @@ import org.apache.spark.util.collection.Utils.sequenceToOption | |||
*/ | |||
object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper { | |||
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | |||
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, _) => | |||
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's sufficient to only match keyGroupedPartitioning = None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After second thought, I think it should only match None
here, otherwise catalystPartitioning
will be calculated every round, if the generated catalystPartitioning
contains Alias
, will cause !plan.fastEquals(reOptimized)
and fail checkBatchIdempotence
.
"SPARK-30289 Create: partitioned by nested column" happen to cover this case after we changed the InMemoryTable#outputPartitioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it's better to match None
here.
CI is green, please take another look @dongjoon-hyun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Hmm thinking more about this, I think maybe we should completely retain the previous behavior and only allow identity transform and fail even if a V2 transform exist in the function catalog. Otherwise, the write may fail at a later stage. |
|
Can one of the admins verify this patch? |
} | ||
|
||
def toCatalyst( | ||
expr: V2Expression, | ||
query: LogicalPlan, | ||
strict: Boolean = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of adding a new parameter, I think it's clearer and more type-safe to have 2 methods
def toCatalystOpt ... : Option[Expression]
def toCatalyst(...): Expression = toCatalystOpt(...).getOrElse(throw error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, updated
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
Outdated
Show resolved
Hide resolved
val funCatalogOpt = relation.catalog.flatMap { | ||
case c: FunctionCatalog => Some(c) | ||
case _ => None | ||
} | ||
|
||
val catalystPartitioning = scan.outputPartitioning() match { | ||
case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map( | ||
V2ExpressionUtils.toCatalyst(_, relation, funCatalogOpt))) | ||
V2ExpressionUtils.toCatalystOpt(_, relation, funCatalogOpt))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should also fail here. If a data source uses an invalid partitioning, we should fail the query and let users know, so that they can debug and fix the data source. Otherwise, users may live with a performance bug for a while as it's hard to figure out where the problem is. cc @sunchao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, w/o error messages the user must infer the issue by query plan, but I think it's a little bit aggressive to fail the query because it's only a performance issue but not for correctness, how about adding warning logs here? ping @sunchao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also a bit more inclined to use warning messages here. To debug performance issue, it should be relatively straightforward to check out the query plan and see whether there's any shuffle in it, combined with checking query logs. In addition, existing V2 data source tables could already have custom transforms such as bucket, days, etc, and we don't want queries against these tables start to fail (because there is no function catalog) after upgrading to Spark 3.3.
There is already warning message when function catalog exists but the transform function doesn't, in the method loadV2FunctionOpt
. It seems we don't have warning message for the case when a custom transform function is used but there is no function catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with the updated changes.
Merged to master/3.3, thanks! |
…an not be translated After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict. Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq. `V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT). No. New UT. Closes #36697 from pan3793/SPARK-39313. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Chao Sun <[email protected]>
Thank you, @pan3793 , @sunchao , @cloud-fan ! |
What changes were proposed in this pull request?
After reading code changes in #35657, I guess the original intention of changing the return type of
V2ExpressionUtils.toCatalyst
fromExpression
toOption[Expression]
is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict.Specifically,
V2ExpressionUtils.toCatalystOrdering
should fail if V2Expression can not be translated instead of returning empty Seq.Why are the changes needed?
V2ExpressionUtils.toCatalystOrdering
is used byDistributionAndOrderingUtils
, the current behavior will break the semantics ofRequiresDistributionAndOrdering#requiredOrdering
in some cases(see UT).Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT.