-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1365 [HOTFIX] Fix RateLimitedOutputStream test #277
Conversation
How about just making sure it has a lower bound of 4? |
Merged build triggered. Build is starting -or- tests failed to complete. |
Merged build started. Build is starting -or- tests failed to complete. |
rate limiting was about setting an upper bound in throughput, and thus lower bound in time. |
Merged build finished. Build is starting -or- tests failed to complete. |
Build is starting -or- tests failed to complete. |
@rxin true, but that's like saying an eventual consistency implementation that never converges is okay. I.e. we want to test that it isn't just a no-op stream that never passes data through. |
@rxin I'll just add a conservative upper bound also to make sure it does eventually get through. |
This test needs to be fixed. It currently depends on Thread.sleep() having exact-timing semantics, which is not a valid assumption.
Merged build triggered. Build is starting -or- tests failed to complete. |
Merged build started. Build is starting -or- tests failed to complete. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
This test needs to be fixed. It currently depends on Thread.sleep() having exact-timing semantics, which is not a valid assumption. Author: Patrick Wendell <[email protected]> Closes apache#277 from pwendell/rate-limited-stream and squashes the following commits: 6c0ff81 [Patrick Wendell] SPARK-1365: Fix RateLimitedOutputStream test
* MapR [SPARK-226] Spark - pySpark Security Vulnerability
…nt filters This is a WIP version of apache#37630 at commit 83c59ab5e7e2abfaf83abe7ec418f30a5c7a41ea, but we introduce the `spark.cloudera.sql.advancedSubqueryMerge.enabled` (default true) to disable the feature if needed. After apache#32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine `Filter` nodes with different conditions if those conditions can be merged in an ancestor `Aggregate` node. Consider the following query with 2 subqueries: ``` SELECT (SELECT avg(a) FROM t WHERE c = 1) (SELECT sum(a) FROM t WHERE c = 2) ``` where the subqueries can be merged to: ``` SELECT avg(a) FILTER (WHERE c = 1), sum(b) FILTER (WHERE c = 2) FORM t WHERE c = 1 OR c = 2 ``` After this PR the 2 subqueries are merged to this optimized form: ``` == Optimized Logical Plan == Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()apache#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L] : :- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L] : : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet : +- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L] : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet +- OneRowRelation ``` and physical form: ``` == Physical Plan == *(1) Project [Subquery scalar-subquery#260, [id=apache#148].avg(a) AS scalarsubquery()apache#277, ReusedSubquery Subquery scalar-subquery#260, [id=apache#148].sum(b) AS scalarsubquery()#278L] : :- Subquery scalar-subquery#260, [id=apache#148] : : +- *(2) Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : : +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)apache#268, sum(b)#271L]) : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143] : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L]) : : +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : : +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : : +- *(1) ColumnarToRow : : +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int> : +- ReusedSubquery Subquery scalar-subquery#260, [id=apache#148] +- *(1) Scan OneRowRelation[] ``` The optimization in this PR doesn't kick in if the filters (`c = 1`, `c = 2`) are partition or bucket filters (to avoid possible performance degradation), but allows merging pushed-down data filters depending on a new `spark.sql.planMerge.ignorePushedDataFilters` config value (default `true`). Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - Merge different filters off 9526 9634 97 0.0 244257993.6 1.0X [info] q9 - Merge different filters on 3798 3881 133 0.0 97381735.1 2.5X ``` The performance improvement in case of `q9` comes from merging 15 subqueries into 1 subquery (apache#32298 was able to merge 15 subqueries into 5). No. Existing and new UTs. Change-Id: Ibeab5772549660ed217707f9b7cdac39491bf096
This test needs to be fixed. It currently depends on Thread.sleep() having exact-timing
semantics, which is not a valid assumption.