-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1171: when executor is removed, we should minus totalCores instead of just freeCores on that executor #63
Conversation
Can one of the admins verify this patch? |
Jenkins, add to whitelist. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
One or more automated tests failed |
Merged build triggered. |
Merged build started. |
Merged build finished. |
All automated tests passed. |
@@ -21,4 +21,6 @@ package org.apache.spark.scheduler | |||
* Represents free resources available on an executor. | |||
*/ | |||
private[spark] | |||
class WorkerOffer(val executorId: String, val host: String, val cores: Int) | |||
class WorkerOffer(val executorId: String, val host: String, var cores: Int) { | |||
@transient val totalcores = cores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be transient? also use camelcase for naming (totalCores)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually on second thought, can CoarseGrainedSchedulerBackend just store the total cores for each worker in a hash map? I'd prefer that solution since other classes use WorkerOffer and don't use it to keep track of the total cores on each worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 I'd also like to see WorkerOffer remain more like an immutable message type, with derived, mutable structures created only locally within the implementations that need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On that note, it seems to me that WorkerOffer should just be a case class, since all the constructor parameters are public vals anyway.
Merged build triggered. |
Merged build started. |
Merged build finished. |
All automated tests passed. |
@kayousterhout @markhamstra @andrewor14 Thank you for your comments, I updated the code, how about this? |
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) | ||
// reconstruct workerOffers | ||
workerOffers.foreach(o => workerOffers(o._1) = | ||
new WorkerOffer(o._1, o._2.host, freeCores(o._1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that WorkerOffer is a case class, you can do this and the one in makeOffers with the copy idiom:
workerOffers.keys.foreach { executorId =>
workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId))
}
Merged build triggered. |
Merged build started. |
Merged build finished. |
All automated tests passed. |
@@ -21,4 +21,4 @@ package org.apache.spark.scheduler | |||
* Represents free resources available on an executor. | |||
*/ | |||
private[spark] | |||
class WorkerOffer(val executorId: String, val host: String, val cores: Int) | |||
case class WorkerOffer(executorId: String, host: String, cores: Int); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
superfluous ';'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, sorry, fixed
Merged build triggered. |
Merged build started. |
This new version of the change doesn't look any simpler to me than the current version of the code and I think is a slightly confusing way of using worker offers to store info about the executors. Can you just remove executorAddress, the unused variable, and fix the bug, but keep the original way of generating worker offers? |
Merged build finished. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
All automated tests passed. |
How about this? |
This looks good -- I've merged this into master. |
@kayousterhout Thank you very much! |
Fixing spark streaming example and a bug in examples build. - Examples assembly included a log4j.properties which clobbered Spark's - Example had an error where some classes weren't serializable - Did some other clean-up in this example (cherry picked from commit 28e9c2a) Signed-off-by: Patrick Wendell <[email protected]>
…ead of just freeCores on that executor https://spark-project.atlassian.net/browse/SPARK-1171 When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores... Author: CodingCat <[email protected]> Author: Nan Zhu <[email protected]> Closes apache#63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits: f6bf93f [Nan Zhu] code clean 19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers 43c13e9 [CodingCat] keep WorkerOffer immutable af470d3 [CodingCat] style fix 0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
Catch up with branch-1.4 bug fixes and bump jersey
"Fix" timestamp parsing and update spark.
Implement a Go Spark CLI
## What changes were proposed in this pull request? In DataSourceV2Strategy, it seems we eliminate the subqueries by mistake after normalizing filters. We have a sql with a scalar subquery: ``` scala val plan = spark.sql("select * from t2 where t2a > (select max(t1a) from t1)") plan.explain(true) ``` And we get the log info of DataSourceV2Strategy: ``` Pushing operators to csv:examples/src/main/resources/t2.txt Pushed Filters: Post-Scan Filters: isnotnull(t2a#30) Output: t2a#30, t2b#31 ``` The `Post-Scan Filters` should contain the scalar subquery, but we eliminate it by mistake. ``` == Parsed Logical Plan == 'Project [*] +- 'Filter ('t2a > scalar-subquery#56 []) : +- 'Project [unresolvedalias('max('t1a), None)] : +- 'UnresolvedRelation `t1` +- 'UnresolvedRelation `t2` == Analyzed Logical Plan == t2a: string, t2b: string Project [t2a#30, t2b#31] +- Filter (t2a#30 > scalar-subquery#56 []) : +- Aggregate [max(t1a#13) AS max(t1a)#63] : +- SubqueryAlias `t1` : +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt +- SubqueryAlias `t2` +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt == Optimized Logical Plan == Filter (isnotnull(t2a#30) && (t2a#30 > scalar-subquery#56 [])) : +- Aggregate [max(t1a#13) AS max(t1a)#63] : +- Project [t1a#13] : +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt == Physical Plan == *(1) Project [t2a#30, t2b#31] +- *(1) Filter isnotnull(t2a#30) +- *(1) BatchScan[t2a#30, t2b#31] class org.apache.spark.sql.execution.datasources.v2.csv.CSVScan ``` ## How was this patch tested? ut Closes #24321 from francis0407/SPARK-27411. Authored-by: francis0407 <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
* Modify existed Partioning & Distribution to reduce shuffles for full outer join * Refactor and test
* Modify existed Partioning & Distribution to reduce shuffles for full outer join * Refactor and test
* Fix the opentelekomcloud job - Add the OS_VPC_ID environment variable - Change the OS_FLAVOR_ID and OS_FLAVOR_ID_RESIZE value - Enable all the tests running For apache#63 * Update run.yaml * change to use an ubuntu image for testing * add az environment variable for otc job
### What changes were proposed in this pull request? Push down filter through expand. For case below: ``` create table t1(pid int, uid int, sid int, dt date, suid int) using parquet; create table t2(pid int, vs int, uid int, csid int) using parquet; SELECT years, appversion, SUM(uusers) AS users FROM (SELECT Date_trunc('year', dt) AS years, CASE WHEN h.pid = 3 THEN 'iOS' WHEN h.pid = 4 THEN 'Android' ELSE 'Other' END AS viewport, h.vs AS appversion, Count(DISTINCT u.uid) AS uusers ,Count(DISTINCT u.suid) AS srcusers FROM t1 u join t2 h ON h.uid = u.uid GROUP BY 1, 2, 3) AS a WHERE viewport = 'iOS' GROUP BY 1, 2 ``` Plan. before this pr: ``` == Physical Plan == *(5) HashAggregate(keys=[years#30, appversion#32], functions=[sum(uusers#33L)]) +- Exchange hashpartitioning(years#30, appversion#32, 200), true, [id=#251] +- *(4) HashAggregate(keys=[years#30, appversion#32], functions=[partial_sum(uusers#33L)]) +- *(4) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[count(if ((gid#44 = 1)) u.`uid`#47 else null)]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, 200), true, [id=#246] +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[partial_count(if ((gid#44 = 1)) u.`uid`#47 else null)]) +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44, 200), true, [id=#241] +- *(2) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[]) +- *(2) Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46 = iOS) +- *(2) Expand [ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, uid#7, null, 1), ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, null, suid#10, 2)], [date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44] +- *(2) Project [uid#7, dt#9, suid#10, pid#11, vs#12] +- *(2) BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight :- *(2) Project [uid#7, dt#9, suid#10] : +- *(2) Filter isnotnull(uid#7) : +- *(2) ColumnarToRow : +- FileScan parquet default.t1[uid#7,dt#9,suid#10] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date,suid:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint))), [id=#233] +- *(1) Project [pid#11, vs#12, uid#13] +- *(1) Filter isnotnull(uid#13) +- *(1) ColumnarToRow +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [isnotnull(uid#13)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` Plan. after. this pr. : ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[years#0, appversion#2], functions=[sum(uusers#3L)], output=[years#0, appversion#2, users#5L]) +- Exchange hashpartitioning(years#0, appversion#2, 5), true, [id=#71] +- HashAggregate(keys=[years#0, appversion#2], functions=[partial_sum(uusers#3L)], output=[years#0, appversion#2, sum#22L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[count(distinct uid#7)], output=[years#0, appversion#2, uusers#3L]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, 5), true, [id=#67] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[partial_count(distinct uid#7)], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, count#27L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7, 5), true, [id=#63] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles)) AS date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END AS CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Project [uid#7, dt#9, pid#11, vs#12] +- BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight, false :- Filter isnotnull(uid#7) : +- FileScan parquet default.t1[uid#7,dt#9] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#58] +- Filter ((CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS) AND isnotnull(uid#13)) +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [(CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS), isnotnull..., Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` ### Why are the changes needed? Improve performance, filter more data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #30278 from AngersZhuuuu/SPARK-33302. Authored-by: angerszhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…join can be planned as broadcast join ### What changes were proposed in this pull request? Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. ```scala spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1") spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain ``` Before this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65] : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` After this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` ### Why are the changes needed? 1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance. 2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test. SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- q14a | 660 | 594 q14b | 660 | 600 q38 | 55 | 29 q87 | 66 | 35 Before this pr:  After this pr:  Closes #31145 from wangyum/SPARK-34081. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…Anti over Aggregate if join can be planned as broadcast join ### What changes were proposed in this pull request? Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. ```scala spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1") spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain ``` Before this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65] : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` After this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` ### Why are the changes needed? 1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance. 2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test. SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- q14a | 660 | 594 q14b | 660 | 600 q38 | 55 | 29 q87 | 66 | 35 Before this pr:  After this pr:  Closes #31145 from wangyum/SPARK-34081. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit d3ea308)
### What changes were proposed in this pull request? Bump the version of Hive to 2.3.9.9-apple. This is cherry-picked from `branch-3.4.0-apple` Patch List: https://github.pie.apple.com/IPR/apache-hive/compare/7c8afd7624...99d3db4b46 ``` rdar://128626264 Fixes get_table call for older hms server 1.x version (apache#63) MINOR build: Add a bump-version.sh script to simplify releases ``` ### Why are the changes needed? Adds support for Hive 1.0 servers ### Does this PR introduce _any_ user-facing change? Yes. There is a new configuration option `hive.metastore.client.force.old.get.table`. ### How was this patch tested? The Hive changes were tested against a Hive 1.0 server. ### Was this patch authored or co-authored using generative AI tooling? No
https://spark-project.atlassian.net/browse/SPARK-1171
When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...