Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22025][PySpark] Speeding up fromInternal for StructField #19246

Closed
wants to merge 3 commits into from

Conversation

maver1ck
Copy link
Contributor

@maver1ck maver1ck commented Sep 15, 2017

What changes were proposed in this pull request?

Change function call to references can greatly speed up function calling.

Benchmark

df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()

Before:

420274688 function calls (410272560 primitive calls) in 1710.883 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
110000000  412.988    0.000  648.632    0.000 types.py:442(fromInternal)
 10000000  250.461    0.000  899.093    0.000 types.py:629(<listcomp>)
 30000000  188.816    0.000 1409.060    0.000 types.py:622(fromInternal)
100000000  177.706    0.000  177.706    0.000 types.py:88(fromInternal)

After

310274584 function calls (300272456 primitive calls) in 1308.709 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 10000000  251.018    0.000  483.316    0.000 types.py:619(<listcomp>)
 30000000  190.364    0.000 1001.505    0.000 types.py:612(fromInternal)
100000000  175.242    0.000  175.242    0.000 types.py:88(fromInternal)
 20000000  155.427    0.000  325.382    0.000 types.py:1471(_create_row)

The change is 110 millions function calls less in types.py:442(fromInternal) and about 25% performance gain.

How was this patch tested?

Existing tests.
Performance benchmark.

@SparkQA
Copy link

SparkQA commented Sep 15, 2017

Test build #81821 has finished for PR 19246 at commit e3dfd22.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2017

Test build #81822 has finished for PR 19246 at commit 2e6160e.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2017

Test build #81823 has finished for PR 19246 at commit f8e0ac4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Sep 15, 2017

Hi, @maver1ck .
Could you add your benchmark result on PR description for reviewers like your previous #19234? That will be your commit log finally.

greatly speed up function calling

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also explain how the current change speeds up in the PR description.

@maver1ck
Copy link
Contributor Author

@dongjoon-hyun
I'll do it on Monday.

self.toInternal = dataType.toInternal
self.fromInternal = dataType.fromInternal

def __getstate__(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle pickle by ourselves because we have fields with function values

@gatorsmile
Copy link
Member

Could you mark [PySpark] in the title? cc @ueshin

@maver1ck maver1ck changed the title [SPARK-22025] Speeding up fromInternal for StructField [SPARK-22025][PySpark] Speeding up fromInternal for StructField Sep 17, 2017
wangyum pushed a commit to 1haodian/spark that referenced this pull request Sep 17, 2017
## What changes were proposed in this pull request?

StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations)

Benchmarks (Python profiler)
```
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
310274584 function calls (300272456 primitive calls) in 1320.684 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 10000000  253.417    0.000  486.991    0.000 types.py:619(<listcomp>)
 30000000  192.272    0.000 1009.986    0.000 types.py:612(fromInternal)
100000000  176.140    0.000  176.140    0.000 types.py:88(fromInternal)
 20000000  156.832    0.000  328.093    0.000 types.py:1471(_create_row)
    14000  107.206    0.008 1237.917    0.088 {built-in method loads}
 20000000   80.176    0.000 1090.162    0.000 types.py:1468(<lambda>)
```

After
```
210274584 function calls (200272456 primitive calls) in 1035.974 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 30000000  215.845    0.000  698.748    0.000 types.py:612(fromInternal)
 20000000  165.042    0.000  351.572    0.000 types.py:1471(_create_row)
    14000  116.834    0.008  946.791    0.068 {built-in method loads}
 20000000   87.326    0.000  786.073    0.000 types.py:1468(<lambda>)
 20000000   85.477    0.000  134.607    0.000 types.py:1519(__new__)
 10000000   65.777    0.000  126.712    0.000 types.py:619(<listcomp>)
```

Main difference is types.py:619(<listcomp>) and types.py:88(fromInternal) (which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.

Benchmark (worst case scenario.)

Test
```
df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
31166064 function calls (31163984 primitive calls) in 150.882 seconds
```

After
```
31166064 function calls (31163984 primitive calls) in 153.220 seconds
```

IMPORTANT:
The benchmark was done on top of apache#19246.
Without apache#19246 the performance improvement will be even greater.

## How was this patch tested?

Existing tests.
Performance benchmark.

Author: Maciej Bryński <[email protected]>

Closes apache#19249 from maver1ck/spark_22032.
self.dataType = dataType
self.nullable = nullable
self.metadata = metadata
self.needConversion = dataType.needConversion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only main concern is, it replaces the reference of the bound method fromStructType to another method bound to another instance. I don't actually quite like a monkey patch in Python because, IMHO, it confuses other developers, which might slow down the improvement iteration from the community.

I just ran the Python profile on the top of the current master with this patch:

Before

============================================================
Profile of RDD<id=13>
============================================================
         220158736 function calls (210148475 primitive calls) in 373.886 seconds

After

============================================================
Profile of RDD<id=13>
============================================================
         210149857 function calls (200139596 primitive calls) in 377.577 seconds

Looks the improvement is not quite significant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT @ueshin?

Copy link
Member

@ueshin ueshin Sep 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between your benchmark and @maver1ck's? Why are the improvements so different?
If the improvement is not quite significant, we shouldn't take this patch because it'll confess developers as you said.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think @maver1ck did this with #19249 I guess. I ran the same code in the PR description. Will double check and be back with some commands I ran.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the current master, 718bbc9

Before

./build/mvn -DskipTests -Psparkr -Phive -Phive-thriftserver clean package
find . -name "*.pyc" -exec rm -f {} \;
sync && sudo purge
./bin/pyspark --conf spark.python.profile=true
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
sc.show_profiles()
============================================================
Profile of RDD<id=13>
============================================================
         220158736 function calls (210148475 primitive calls) in 379.599 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
30000000/20000000   88.015    0.000  263.471    0.000 types.py:623(fromInternal)
 20000000   83.744    0.000  130.714    0.000 types.py:1421(_create_row)
      683   62.466    0.091  358.079    0.524 {cPickle.loads}
 20000000   21.786    0.000  285.257    0.000 types.py:1418(<lambda>)
 10000000   18.998    0.000   18.998    0.000 {zip}
 20000000   16.783    0.000   32.260    0.000 types.py:1469(__new__)
 30045761   16.197    0.000   16.197    0.000 {isinstance}
 20000000   15.477    0.000   15.477    0.000 {built-in method __new__ of type object at 0x10db7b428}
 20000000   14.710    0.000   14.710    0.000 types.py:1553(__setattr__)
 10000008   14.361    0.000  377.376    0.000 rdd.py:1040(<genexpr>)
 20000000    9.984    0.000    9.984    0.000 types.py:1417(_create_row_inbound_converter)
 10000000    9.579    0.000   19.590    0.000 types.py:440(fromInternal)
...

After

curl -O https://patch-diff.githubusercontent.com/raw/apache/spark/pull/19246.patch
git apply 19246.patch
git diff
./build/mvn -DskipTests -Psparkr -Phive -Phive-thriftserver clean package
find . -name "*.pyc" -exec rm -f {} \;
sync && sudo purge
./bin/pyspark --conf spark.python.profile=true
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
sc.show_profiles()
============================================================
Profile of RDD<id=13>
============================================================
         210149857 function calls (200139596 primitive calls) in 385.988 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
30000000/20000000   92.012    0.000  265.554    0.000 types.py:632(fromInternal)
 20000000   87.470    0.000  137.298    0.000 types.py:1430(_create_row)
      683   65.402    0.096  364.590    0.534 {cPickle.loads}
 20000000   22.989    0.000  288.543    0.000 types.py:1427(<lambda>)
 10000000   19.146    0.000   19.146    0.000 {zip}
 20000000   17.881    0.000   33.933    0.000 types.py:1478(__new__)
 30045761   17.121    0.000   17.121    0.000 {isinstance}
 20000000   16.052    0.000   16.052    0.000 {built-in method __new__ of type object at 0x10153d428}
 20000000   15.894    0.000   15.894    0.000 types.py:1562(__setattr__)
 10000008   14.938    0.000  383.739    0.000 rdd.py:1040(<genexpr>)
 20000000   10.214    0.000   10.214    0.000 types.py:1426(_create_row_inbound_converter)
       16    2.248    0.140  385.986   24.124 {sum}
     1374    2.228    0.002    2.228    0.002 {method 'read' of 'file' objects}

@HyukjinKwon
Copy link
Member

I'd close this PR if there is no objection @maver1ck and I didn't miss something.

@maver1ck
Copy link
Contributor Author

@HyukjinKwon
I created this before #19249, which greatly decrease function call.

I agree we can close it.

@maver1ck maver1ck closed this Sep 20, 2017
@HyukjinKwon
Copy link
Member

Yea, thanks. That was a cool patch BTW.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants