Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22025][PySpark] Speeding up fromInternal for StructField #19246

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,24 @@ def __init__(self, name, dataType, nullable=True, metadata=None):
self.dataType = dataType
self.nullable = nullable
self.metadata = metadata or {}
self.needConversion = dataType.needConversion
self.toInternal = dataType.toInternal
self.fromInternal = dataType.fromInternal

def __getstate__(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle pickle by ourselves because we have fields with function values

"""Return state values to be pickled."""
return (self.name, self.dataType, self.nullable, self.metadata)

def __setstate__(self, state):
"""Restore state from the unpickled state values."""
name, dataType, nullable, metadata = state
self.name = name
self.dataType = dataType
self.nullable = nullable
self.metadata = metadata
self.needConversion = dataType.needConversion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only main concern is, it replaces the reference of the bound method fromStructType to another method bound to another instance. I don't actually quite like a monkey patch in Python because, IMHO, it confuses other developers, which might slow down the improvement iteration from the community.

I just ran the Python profile on the top of the current master with this patch:

Before

============================================================
Profile of RDD<id=13>
============================================================
         220158736 function calls (210148475 primitive calls) in 373.886 seconds

After

============================================================
Profile of RDD<id=13>
============================================================
         210149857 function calls (200139596 primitive calls) in 377.577 seconds

Looks the improvement is not quite significant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT @ueshin?

Copy link
Member

@ueshin ueshin Sep 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between your benchmark and @maver1ck's? Why are the improvements so different?
If the improvement is not quite significant, we shouldn't take this patch because it'll confess developers as you said.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think @maver1ck did this with #19249 I guess. I ran the same code in the PR description. Will double check and be back with some commands I ran.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the current master, 718bbc9

Before

./build/mvn -DskipTests -Psparkr -Phive -Phive-thriftserver clean package
find . -name "*.pyc" -exec rm -f {} \;
sync && sudo purge
./bin/pyspark --conf spark.python.profile=true
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
sc.show_profiles()
============================================================
Profile of RDD<id=13>
============================================================
         220158736 function calls (210148475 primitive calls) in 379.599 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
30000000/20000000   88.015    0.000  263.471    0.000 types.py:623(fromInternal)
 20000000   83.744    0.000  130.714    0.000 types.py:1421(_create_row)
      683   62.466    0.091  358.079    0.524 {cPickle.loads}
 20000000   21.786    0.000  285.257    0.000 types.py:1418(<lambda>)
 10000000   18.998    0.000   18.998    0.000 {zip}
 20000000   16.783    0.000   32.260    0.000 types.py:1469(__new__)
 30045761   16.197    0.000   16.197    0.000 {isinstance}
 20000000   15.477    0.000   15.477    0.000 {built-in method __new__ of type object at 0x10db7b428}
 20000000   14.710    0.000   14.710    0.000 types.py:1553(__setattr__)
 10000008   14.361    0.000  377.376    0.000 rdd.py:1040(<genexpr>)
 20000000    9.984    0.000    9.984    0.000 types.py:1417(_create_row_inbound_converter)
 10000000    9.579    0.000   19.590    0.000 types.py:440(fromInternal)
...

After

curl -O https://patch-diff.githubusercontent.com/raw/apache/spark/pull/19246.patch
git apply 19246.patch
git diff
./build/mvn -DskipTests -Psparkr -Phive -Phive-thriftserver clean package
find . -name "*.pyc" -exec rm -f {} \;
sync && sudo purge
./bin/pyspark --conf spark.python.profile=true
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
sc.show_profiles()
============================================================
Profile of RDD<id=13>
============================================================
         210149857 function calls (200139596 primitive calls) in 385.988 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
30000000/20000000   92.012    0.000  265.554    0.000 types.py:632(fromInternal)
 20000000   87.470    0.000  137.298    0.000 types.py:1430(_create_row)
      683   65.402    0.096  364.590    0.534 {cPickle.loads}
 20000000   22.989    0.000  288.543    0.000 types.py:1427(<lambda>)
 10000000   19.146    0.000   19.146    0.000 {zip}
 20000000   17.881    0.000   33.933    0.000 types.py:1478(__new__)
 30045761   17.121    0.000   17.121    0.000 {isinstance}
 20000000   16.052    0.000   16.052    0.000 {built-in method __new__ of type object at 0x10153d428}
 20000000   15.894    0.000   15.894    0.000 types.py:1562(__setattr__)
 10000008   14.938    0.000  383.739    0.000 rdd.py:1040(<genexpr>)
 20000000   10.214    0.000   10.214    0.000 types.py:1426(_create_row_inbound_converter)
       16    2.248    0.140  385.986   24.124 {sum}
     1374    2.228    0.002    2.228    0.002 {method 'read' of 'file' objects}

self.toInternal = dataType.toInternal
self.fromInternal = dataType.fromInternal

def simpleString(self):
return '%s:%s' % (self.name, self.dataType.simpleString())
Expand All @@ -431,15 +449,6 @@ def fromJson(cls, json):
json["nullable"],
json["metadata"])

def needConversion(self):
return self.dataType.needConversion()

def toInternal(self, obj):
return self.dataType.toInternal(obj)

def fromInternal(self, obj):
return self.dataType.fromInternal(obj)

def typeName(self):
raise TypeError(
"StructField does not have typeName. "
Expand Down