Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3030] [PySpark] Reuse Python worker #2259

Closed
wants to merge 14 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Sep 4, 2014

Reuse Python worker to avoid the overhead of fork() Python process for each tasks. It also tracks the broadcasts for each worker, avoid sending repeated broadcasts.

This can reduce the time for dummy task from 22ms to 13ms (-40%). It can help to reduce the latency for Spark Streaming.

For a job with broadcast (43M after compress):

    b = sc.broadcast(set(range(30000000)))
    print sc.parallelize(range(24000), 100).filter(lambda x: x in b.value).count()

It will finish in 281s without reused worker, and it will finish in 65s with reused worker(4 CPUs). After reusing the worker, it can save about 9 seconds for transfer and deserialize the broadcast for each tasks.

It's enabled by default, could be disabled by spark.python.worker.reuse = false.

@@ -50,7 +50,7 @@ echo "Running PySpark tests. Output is in python/unit-tests.log."

# Try to test with Python 2.6, since that's the minimum version that we support:
if [ $(which python2.6) ]; then
export PYSPARK_PYTHON="python2.6"
export PYSPARK_PYTHON="pypy"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change got pulled in by accident?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes:)

@SparkQA
Copy link

SparkQA commented Sep 4, 2014

QA tests have started for PR 2259 at commit ace2917.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 4, 2014

QA tests have finished for PR 2259 at commit ace2917.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class BlockManagerMaster(
    • class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])

@davies
Copy link
Contributor Author

davies commented Sep 5, 2014

Jenkins, retest this please.

@davies
Copy link
Contributor Author

davies commented Sep 6, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Sep 6, 2014

QA tests have started for PR 2259 at commit 583716e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 6, 2014

QA tests have finished for PR 2259 at commit 583716e.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Sep 6, 2014

Jenkins, retest this please.

On Sat, Sep 6, 2014 at 12:40 AM, Apache Spark QA [email protected]
wrote:

QA tests have finished
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19904/consoleFull
for PR 2259 at commit 583716e
583716e
.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Reply to this email directly or view it on GitHub
#2259 (comment).

  • Davies

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor

Do you think worker re-use should be enabled by default?

The only problem that I anticipate is for applications that share a single SparkContext with both Python and Scala processes; in these cases, the Python tasks may continue to hog resources (memory that's not used for caching RDDs) even after they complete. This seems like a rare use-case, though, so we could document this change and advise those users to disable this setting.

I'm inclined to have it on by default, since it will be a huge performance win for the vast majority of PySpark users.

@JoshRosen
Copy link
Contributor

It would be interesting to measure the end-to-end performance impact for more realistic jobs, especially ones that make use of large numbers of tasks and large broadcast variables.

@davies
Copy link
Contributor Author

davies commented Sep 7, 2014

It's already enabled by default. I had added benchmark result in the description.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

QA tests have started for PR 2259 at commit e0131a2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

QA tests have finished for PR 2259 at commit e0131a2.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

QA tests have started for PR 2259 at commit 6325fc1.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

QA tests have finished for PR 2259 at commit 6325fc1.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Sep 7, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

QA tests have started for PR 2259 at commit 6325fc1.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

Tests timed out after a configured wait of 120m.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

QA tests have started for PR 2259 at commit 6325fc1.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 7, 2014

Tests timed out after a configured wait of 120m.

@mateiz
Copy link
Contributor

mateiz commented Sep 8, 2014

You guys should time out the worker after some time period to avoid it always consuming resources. If we have that, I think it should be on by default -- in general it's best to minimize the number of different run configurations. However we may need to add a setting to keep the old behavior if some users have code that assumes the worker will shut down.

@davies
Copy link
Contributor Author

davies commented Sep 8, 2014

@mateiz It will time out the worker after 1 minute. It will reuse worker by default, can be disabled by 'spark.python.worker.reuse = false', then it will shut down the worker after task complete immediately.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2259 at commit cf1c55e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

Tests timed out after a configured wait of 120m.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

Tests timed out after a configured wait of 120m.

@JoshRosen
Copy link
Contributor

Hmm, I wonder why we're seeing these timeouts. It looks like both tests failed in recommendation.py, so it might be worth running those tests locally to see whether they're running way slower after this patch.

@davies
Copy link
Contributor Author

davies commented Sep 10, 2014

yeah, I will investigate it locally.

On Tue, Sep 9, 2014 at 8:53 PM, Josh Rosen [email protected] wrote:

Hmm, I wonder why we're seeing these timeouts. It looks like both tests
failed in recommendation.py, so it might be worth running those tests
locally to see whether they're running way slower after this patch.

Reply to this email directly or view it on GitHub
#2259 (comment).

  • Davies

@davies
Copy link
Contributor Author

davies commented Sep 10, 2014

Jenkins, retest this please.

@davies
Copy link
Contributor Author

davies commented Sep 10, 2014

@JoshRosen The problem that will cause hanging has been fixed.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2259 at commit 3939f20.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2259 at commit 3939f20.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 2259 at commit 3939f20.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class RatingDeserializer(FramedSerializer):

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2259 at commit 3939f20.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 2259 at commit 3939f20.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class RatingDeserializer(FramedSerializer):

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2259 at commit 3939f20.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2259 at commit 3939f20.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 2259 at commit 3939f20.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 2259 at commit 3939f20.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	python/pyspark/serializers.py
@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have started for PR 2259 at commit f11f617.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have finished for PR 2259 at commit f11f617.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JavaSparkContext(val sc: SparkContext)
    • throw new IllegalStateException("The main method in the given main class must be static")
    • class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception
    • class Dummy(object):
    • class RatingDeserializer(FramedSerializer):
    • class JavaStreamingContext(val ssc: StreamingContext) extends Closeable

@JoshRosen
Copy link
Contributor

This looks good to me; merging it into master now. I wonder if we'll see a net reduction in Jenkins flakiness due to using significantly fewer ephemeral ports in PySpark after this patch...

@asfgit asfgit closed this in 2aea0da Sep 13, 2014
@nchammas
Copy link
Contributor

Yeah, the bad diffs are especially weird. class Dummy? Really? (Though I doubt this patch would have an impact on that.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants