Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

[SPARK-8978][Streaming] Implements the DirectKafkaRateController #18

Conversation

huitseeker
Copy link

This depends on #16

@huitseeker huitseeker force-pushed the SPARK-8978 branch 5 times, most recently from b69ec1d to 1503e05 Compare July 15, 2015 00:34
if (ratePerSec > 0) {
protected def maxMessagesPerPartition: Option[Long] = {
val estimatedRate = rateController.getLatestRate().toInt
val effectiveRatePerSec = if (estimatedRate > 0) (ratePerSec min estimatedRate) else ratePerSec
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The estimated rate is per stream, not per Kafka partition (like the configuration parameter above). Shouldn't this be accounted for in here?

@huitseeker huitseeker force-pushed the SPARK-8978 branch 3 times, most recently from eccb008 to 2fa8a60 Compare July 15, 2015 11:16
@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/48/

Build Log
last 10 lines

[...truncated 11 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/18/merge^{commit} # timeout=10
Checking out Revision 0f753bd12042de2eab2dd3ac41b934716c795d17 (refs/remotes/origin/pr/18/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 0f753bd12042de2eab2dd3ac41b934716c795d17
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 63593b2e1c18eab652c677bfa005a120bfbce019 to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/48/ and message: Merged build finished.

Test FAILed.

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/49/

Build Log
last 10 lines

[...truncated 11 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/18/merge^{commit} # timeout=10
Checking out Revision 0f753bd12042de2eab2dd3ac41b934716c795d17 (refs/remotes/origin/pr/18/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 0f753bd12042de2eab2dd3ac41b934716c795d17
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of bb436a2381af2bc4033c90bb1697bc2aac3b1e44 to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/49/ and message: Merged build finished.

Test FAILed.

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/50/

Build Log
last 10 lines

[...truncated 11 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/18/merge^{commit} # timeout=10
Checking out Revision cd968115e85ebc13dfc6d4ff8c3dbc8622b464cf (refs/remotes/origin/pr/18/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f cd968115e85ebc13dfc6d4ff8c3dbc8622b464cf
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 73399ced62f6506de93e8f2ff53c6566b6d14ca3 to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/50/ and message: Merged build finished.

Test FAILed.

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/53/

Build Log
last 10 lines

[...truncated 11 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/18/merge^{commit} # timeout=10
Checking out Revision cd968115e85ebc13dfc6d4ff8c3dbc8622b464cf (refs/remotes/origin/pr/18/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f cd968115e85ebc13dfc6d4ff8c3dbc8622b464cf
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 0d4710bb9810dff1023487cd2804c2516ea126a8 to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/53/ and message: Merged build finished.

Test FAILed.

@dragos dragos force-pushed the topic/streaming-bp/dynamic-rate branch from 13ada97 to 0c51959 Compare July 20, 2015 14:45
@nraychaudhuri
Copy link

ok to test

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/56/

Build Log
last 10 lines

[...truncated 18 lines...]
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 4939a0c1b9ab1d79e36a73f5c98b2ce16aac9b47
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
Configuration ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 is still in the queue: Waiting for next available executor on Spark JDK-7 PV (i-a6d3f874)
Configuration ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 is still in the queue: Waiting for next available executor on Spark-Ora-JDK7-PV
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 4318a97b5d56988c9ef9320376fde0f4e8e0da1c to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/56/ and message: Merged build finished.

Test FAILed.

@nraychaudhuri
Copy link

ok to test

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/57/

Build Log
last 10 lines

[...truncated 16 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/18/merge^{commit} # timeout=10
Checking out Revision 4939a0c1b9ab1d79e36a73f5c98b2ce16aac9b47 (refs/remotes/origin/pr/18/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 4939a0c1b9ab1d79e36a73f5c98b2ce16aac9b47
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 4318a97b5d56988c9ef9320376fde0f4e8e0da1c to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/57/ and message: Merged build finished.

Test FAILed.

@dragos dragos closed this Sep 9, 2015
fdp-ci pushed a commit that referenced this pull request Aug 27, 2019
## What changes were proposed in this pull request?
This PR aims at improving the way physical plans are explained in spark.

Currently, the explain output for physical plan may look very cluttered and each operator's
string representation can be very wide and wraps around in the display making it little
hard to follow. This especially happens when explaining a query 1) Operating on wide tables
2) Has complex expressions etc.

This PR attempts to split the output into two sections. In the header section, we display
the basic operator tree with a number associated with each operator. In this section, we strictly
control what we output for each operator. In the footer section, each operator is verbosely
displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be
correlated by the originating expression id from its parent plan.

To illustrate, here is a simple plan displayed in old vs new way.

Example query1 :
```
EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0
```

Old :
```
*(2) Project [key#2, max(val)#15]
+- *(2) Filter (isnotnull(max(val#3)#18) AND (max(val#3)#18 > 0))
   +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)#15, max(val#3)#18])
      +- Exchange hashpartitioning(key#2, 200)
         +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21])
            +- *(1) Project [key#2, val#3]
               +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0))
                  +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>
```
New :
```
Project (8)
+- Filter (7)
   +- HashAggregate (6)
      +- Exchange (5)
         +- HashAggregate (4)
            +- Project (3)
               +- Filter (2)
                  +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (isnotnull(key#2) AND (key#2 > 0))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]

(4) HashAggregate [codegen id : 1]
Input: [key#2, val#3]

(5) Exchange
Input: [key#2, max#11]

(6) HashAggregate [codegen id : 2]
Input: [key#2, max#11]

(7) Filter [codegen id : 2]
Input     : [key#2, max(val)#5, max(val#3)#8]
Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0))

(8) Project [codegen id : 2]
Output    : [key#2, max(val)#5]
Input     : [key#2, max(val)#5, max(val#3)#8]
```

Example Query2 (subquery):
```
SELECT * FROM   explain_temp1 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp2 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp3 WHERE  val > 0) AND val = 2) AND val > 3
```
Old:
```
*(1) Project [key#2, val#3]
+- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3))
   :  +- Subquery scalar-subquery#39
   :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)apache#45])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47])
   :              +- *(1) Project [key#26]
   :                 +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2))
   :                    :  +- Subquery scalar-subquery#38
   :                    :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)apache#43])
   :                    :        +- Exchange SinglePartition
   :                    :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49])
   :                    :              +- *(1) Project [key#28]
   :                    :                 +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0))
   :                    :                    +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int>
   :                    +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int>
   +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int>
```
New:
```
Project (3)
+- Filter (2)
   +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]
===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23
HashAggregate (9)
+- Exchange (8)
   +- HashAggregate (7)
      +- Project (6)
         +- Filter (5)
            +- Scan parquet default.explain_temp2 (4)

(4) Scan parquet default.explain_temp2 [codegen id : 1]
Output: [key#26, val#27]

(5) Filter [codegen id : 1]
Input     : [key#26, val#27]
Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2))

(6) Project [codegen id : 1]
Output    : [key#26]
Input     : [key#26, val#27]

(7) HashAggregate [codegen id : 1]
Input: [key#26]

(8) Exchange
Input: [max#35]

(9) HashAggregate [codegen id : 2]
Input: [max#35]

Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22
HashAggregate (15)
+- Exchange (14)
   +- HashAggregate (13)
      +- Project (12)
         +- Filter (11)
            +- Scan parquet default.explain_temp3 (10)

(10) Scan parquet default.explain_temp3 [codegen id : 1]
Output: [key#28, val#29]

(11) Filter [codegen id : 1]
Input     : [key#28, val#29]
Condition : (isnotnull(val#29) AND (val#29 > 0))

(12) Project [codegen id : 1]
Output    : [key#28]
Input     : [key#28, val#29]

(13) HashAggregate [codegen id : 1]
Input: [key#28]

(14) Exchange
Input: [max#37]

(15) HashAggregate [codegen id : 2]
Input: [max#37]
```

Note:
I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow
would not be able to immediately incorporate the feedback. I will start to
work on them as soon as i can. Also, currently this PR provides a basic infrastructure
for explain enhancement. The details about individual operators will be implemented
in follow-up prs
## How was this patch tested?
Added a new test `explain.sql` that tests basic scenarios. Need to add more tests.

Closes apache#24759 from dilipbiswal/explain_feature.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants