-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22025][PySpark] Speeding up fromInternal for StructField #19246
Conversation
Test build #81821 has finished for PR 19246 at commit
|
Test build #81822 has finished for PR 19246 at commit
|
Test build #81823 has finished for PR 19246 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also explain how the current change speeds up in the PR description.
@dongjoon-hyun |
self.toInternal = dataType.toInternal | ||
self.fromInternal = dataType.fromInternal | ||
|
||
def __getstate__(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to handle pickle by ourselves because we have fields with function values
Could you mark [PySpark] in the title? cc @ueshin |
## What changes were proposed in this pull request? StructType.fromInternal is calling f.fromInternal(v) for every field. We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations) Benchmarks (Python profiler) ``` df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache() df.count() df.rdd.map(lambda x: x).count() ``` Before ``` 310274584 function calls (300272456 primitive calls) in 1320.684 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 10000000 253.417 0.000 486.991 0.000 types.py:619(<listcomp>) 30000000 192.272 0.000 1009.986 0.000 types.py:612(fromInternal) 100000000 176.140 0.000 176.140 0.000 types.py:88(fromInternal) 20000000 156.832 0.000 328.093 0.000 types.py:1471(_create_row) 14000 107.206 0.008 1237.917 0.088 {built-in method loads} 20000000 80.176 0.000 1090.162 0.000 types.py:1468(<lambda>) ``` After ``` 210274584 function calls (200272456 primitive calls) in 1035.974 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 30000000 215.845 0.000 698.748 0.000 types.py:612(fromInternal) 20000000 165.042 0.000 351.572 0.000 types.py:1471(_create_row) 14000 116.834 0.008 946.791 0.068 {built-in method loads} 20000000 87.326 0.000 786.073 0.000 types.py:1468(<lambda>) 20000000 85.477 0.000 134.607 0.000 types.py:1519(__new__) 10000000 65.777 0.000 126.712 0.000 types.py:619(<listcomp>) ``` Main difference is types.py:619(<listcomp>) and types.py:88(fromInternal) (which is removed in After) The number of function calls is 100 million less. And performance is 20% better. Benchmark (worst case scenario.) Test ``` df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache() df.count() df.rdd.map(lambda x: x).count() ``` Before ``` 31166064 function calls (31163984 primitive calls) in 150.882 seconds ``` After ``` 31166064 function calls (31163984 primitive calls) in 153.220 seconds ``` IMPORTANT: The benchmark was done on top of apache#19246. Without apache#19246 the performance improvement will be even greater. ## How was this patch tested? Existing tests. Performance benchmark. Author: Maciej Bryński <[email protected]> Closes apache#19249 from maver1ck/spark_22032.
self.dataType = dataType | ||
self.nullable = nullable | ||
self.metadata = metadata | ||
self.needConversion = dataType.needConversion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only main concern is, it replaces the reference of the bound method fromStructType
to another method bound to another instance. I don't actually quite like a monkey patch in Python because, IMHO, it confuses other developers, which might slow down the improvement iteration from the community.
I just ran the Python profile on the top of the current master with this patch:
Before
============================================================
Profile of RDD<id=13>
============================================================
220158736 function calls (210148475 primitive calls) in 373.886 seconds
After
============================================================
Profile of RDD<id=13>
============================================================
210149857 function calls (200139596 primitive calls) in 377.577 seconds
Looks the improvement is not quite significant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT @ueshin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between your benchmark and @maver1ck's? Why are the improvements so different?
If the improvement is not quite significant, we shouldn't take this patch because it'll confess developers as you said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the current master, 718bbc9
Before
./build/mvn -DskipTests -Psparkr -Phive -Phive-thriftserver clean package
find . -name "*.pyc" -exec rm -f {} \;
sync && sudo purge
./bin/pyspark --conf spark.python.profile=true
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
sc.show_profiles()
============================================================
Profile of RDD<id=13>
============================================================
220158736 function calls (210148475 primitive calls) in 379.599 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
30000000/20000000 88.015 0.000 263.471 0.000 types.py:623(fromInternal)
20000000 83.744 0.000 130.714 0.000 types.py:1421(_create_row)
683 62.466 0.091 358.079 0.524 {cPickle.loads}
20000000 21.786 0.000 285.257 0.000 types.py:1418(<lambda>)
10000000 18.998 0.000 18.998 0.000 {zip}
20000000 16.783 0.000 32.260 0.000 types.py:1469(__new__)
30045761 16.197 0.000 16.197 0.000 {isinstance}
20000000 15.477 0.000 15.477 0.000 {built-in method __new__ of type object at 0x10db7b428}
20000000 14.710 0.000 14.710 0.000 types.py:1553(__setattr__)
10000008 14.361 0.000 377.376 0.000 rdd.py:1040(<genexpr>)
20000000 9.984 0.000 9.984 0.000 types.py:1417(_create_row_inbound_converter)
10000000 9.579 0.000 19.590 0.000 types.py:440(fromInternal)
...
After
curl -O https://patch-diff.githubusercontent.com/raw/apache/spark/pull/19246.patch
git apply 19246.patch
git diff
./build/mvn -DskipTests -Psparkr -Phive -Phive-thriftserver clean package
find . -name "*.pyc" -exec rm -f {} \;
sync && sudo purge
./bin/pyspark --conf spark.python.profile=true
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
sc.show_profiles()
============================================================
Profile of RDD<id=13>
============================================================
210149857 function calls (200139596 primitive calls) in 385.988 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
30000000/20000000 92.012 0.000 265.554 0.000 types.py:632(fromInternal)
20000000 87.470 0.000 137.298 0.000 types.py:1430(_create_row)
683 65.402 0.096 364.590 0.534 {cPickle.loads}
20000000 22.989 0.000 288.543 0.000 types.py:1427(<lambda>)
10000000 19.146 0.000 19.146 0.000 {zip}
20000000 17.881 0.000 33.933 0.000 types.py:1478(__new__)
30045761 17.121 0.000 17.121 0.000 {isinstance}
20000000 16.052 0.000 16.052 0.000 {built-in method __new__ of type object at 0x10153d428}
20000000 15.894 0.000 15.894 0.000 types.py:1562(__setattr__)
10000008 14.938 0.000 383.739 0.000 rdd.py:1040(<genexpr>)
20000000 10.214 0.000 10.214 0.000 types.py:1426(_create_row_inbound_converter)
16 2.248 0.140 385.986 24.124 {sum}
1374 2.228 0.002 2.228 0.002 {method 'read' of 'file' objects}
I'd close this PR if there is no objection @maver1ck and I didn't miss something. |
@HyukjinKwon I agree we can close it. |
Yea, thanks. That was a cool patch BTW. |
What changes were proposed in this pull request?
Change function call to references can greatly speed up function calling.
Benchmark
Before:
After
The change is 110 millions function calls less in types.py:442(fromInternal) and about 25% performance gain.
How was this patch tested?
Existing tests.
Performance benchmark.