Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1365 [HOTFIX] Fix RateLimitedOutputStream test #277

Closed
wants to merge 1 commit into from

Conversation

pwendell
Copy link
Contributor

This test needs to be fixed. It currently depends on Thread.sleep() having exact-timing
semantics, which is not a valid assumption.

@rxin
Copy link
Contributor

rxin commented Mar 31, 2014

How about just making sure it has a lower bound of 4?

@AmplabJenkins
Copy link

Merged build triggered. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Merged build started. Build is starting -or- tests failed to complete.

@pwendell
Copy link
Contributor Author

@rxin didn't know whether that was what @tdas would want... we should probably try to upper bound the test some-how because this is a test of rate limiting.

@rxin
Copy link
Contributor

rxin commented Mar 31, 2014

rate limiting was about setting an upper bound in throughput, and thus lower bound in time.

@AmplabJenkins
Copy link

Merged build finished. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Build is starting -or- tests failed to complete.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13598/

@pwendell
Copy link
Contributor Author

@rxin true, but that's like saying an eventual consistency implementation that never converges is okay. I.e. we want to test that it isn't just a no-op stream that never passes data through.

@pwendell
Copy link
Contributor Author

@rxin I'll just add a conservative upper bound also to make sure it does eventually get through.

@pwendell pwendell changed the title SPARK-1365 [HOTFIX] Disable RateLimitedOutputStream test SPARK-1365 [HOTFIX] Fix RateLimitedOutputStream test Mar 31, 2014
This test needs to be fixed. It currently depends on Thread.sleep() having exact-timing
semantics, which is not a valid assumption.
@AmplabJenkins
Copy link

Merged build triggered. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Merged build started. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13602/

@asfgit asfgit closed this in 33b3c2a Mar 31, 2014
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
This test needs to be fixed. It currently depends on Thread.sleep() having exact-timing
semantics, which is not a valid assumption.

Author: Patrick Wendell <[email protected]>

Closes apache#277 from pwendell/rate-limited-stream and squashes the following commits:

6c0ff81 [Patrick Wendell] SPARK-1365: Fix RateLimitedOutputStream test
Igosuki pushed a commit to Adikteev/spark that referenced this pull request Jul 31, 2018
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
* MapR [SPARK-226] Spark - pySpark Security Vulnerability
peter-toth added a commit to peter-toth/spark that referenced this pull request Nov 26, 2024
…nt filters

This is a WIP version of apache#37630 at commit 83c59ab5e7e2abfaf83abe7ec418f30a5c7a41ea, but we introduce the `spark.cloudera.sql.advancedSubqueryMerge.enabled` (default true) to disable the feature if needed.

After apache#32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine `Filter` nodes with different conditions if those conditions can be merged in an ancestor `Aggregate` node.

Consider the following query with 2 subqueries:
```
SELECT
  (SELECT avg(a) FROM t WHERE c = 1)
  (SELECT sum(a) FROM t WHERE c = 2)
```
where the subqueries can be merged to:
```
SELECT
  avg(a) FILTER (WHERE c = 1),
  sum(b) FILTER (WHERE c = 2)
FORM t
WHERE c = 1 OR c = 2
```
After this PR the 2 subqueries are merged to this optimized form:
```
== Optimized Logical Plan ==
Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()apache#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L]
:  :- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286]
:  :  +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
:  :     +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:  :        +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:  :           +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
:  +- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286]
:     +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
:        +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:           +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:              +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
+- OneRowRelation
```
and physical form:
```
== Physical Plan ==
*(1) Project [Subquery scalar-subquery#260, [id=apache#148].avg(a) AS scalarsubquery()apache#277, ReusedSubquery Subquery scalar-subquery#260, [id=apache#148].sum(b) AS scalarsubquery()#278L]
:  :- Subquery scalar-subquery#260, [id=apache#148]
:  :  +- *(2) Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286]
:  :     +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)apache#268, sum(b)#271L])
:  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143]
:  :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L])
:  :              +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:  :                 +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:  :                    +- *(1) ColumnarToRow
:  :                       +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int>
:  +- ReusedSubquery Subquery scalar-subquery#260, [id=apache#148]
+- *(1) Scan OneRowRelation[]
```

The optimization in this PR doesn't kick in if the filters (`c = 1`, `c = 2`) are partition or bucket filters (to avoid possible performance degradation), but allows merging pushed-down data filters depending on a new `spark.sql.planMerge.ignorePushedDataFilters` config value (default `true`).

Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - Merge different filters off                   9526           9634          97          0.0   244257993.6       1.0X
[info] q9 - Merge different filters on                    3798           3881         133          0.0    97381735.1       2.5X
```
The performance improvement in case of `q9` comes from merging 15 subqueries into 1 subquery (apache#32298 was able to merge 15 subqueries into 5).

No.

Existing and new UTs.

Change-Id: Ibeab5772549660ed217707f9b7cdac39491bf096
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants