-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30434][PYTHON][SQL] Move pandas related functionalities into 'pandas' sub-package #27109
Conversation
@BryanCutler and @ueshin, I think we talked about this once before .. this PR targets to move everything related to pandas and pyarrow to one package |
b2df1ce
to
c89e5d5
Compare
@@ -185,248 +185,6 @@ def loads(self, obj): | |||
raise NotImplementedError | |||
|
|||
|
|||
class ArrowCollectSerializer(Serializer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All classes are moved as are to pyspark.sql.pandas.serializers
@@ -2135,193 +2135,6 @@ def transform(self, func): | |||
"should have been DataFrame." % type(result) | |||
return result | |||
|
|||
@since(1.3) | |||
def toPandas(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to PandasConversionMixin
under pyspark.sql.pandas.conversion
_check_series_convert_timestamps_local_tz(pdf[field.name], timezone) | ||
return pdf | ||
|
||
def mapInPandas(self, udf): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to PandasMapOpsMixin
under pyspark.sql.pandas.map_ops
.
@@ -2814,22 +2816,6 @@ def from_csv(col, schema, options={}): | |||
|
|||
# ---------------------------- User Defined Function ---------------------------------- | |||
|
|||
class PandasUDFType(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to pyspark.sql.pandas.functions
@@ -219,68 +219,6 @@ def pivot(self, pivot_col, values=None): | |||
jgd = self._jgd.pivot(pivot_col, values) | |||
return GroupedData(jgd, self._df) | |||
|
|||
@since(3.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to PandasGroupedOpsMixin
under pyspark.sql.pandas.group_ops
This comment has been minimized.
This comment has been minimized.
@@ -458,135 +459,6 @@ def _createFromLocal(self, data, schema): | |||
data = [schema.toInternal(row) for row in data] | |||
return self._sc.parallelize(data), schema | |||
|
|||
def _get_numpy_record_dtype(self, rec): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All moved to SparkConversionMixin
under pyspark.sql.pandas.conversion
@@ -722,46 +594,12 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr | |||
except Exception: | |||
has_pandas = False | |||
if has_pandas and isinstance(data, pandas.DataFrame): | |||
from pyspark.sql.utils import require_minimum_pandas_version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one needs to be reviewed - I had to split the methods.
@@ -1608,267 +1608,6 @@ def convert(self, obj, gateway_client): | |||
register_input_converter(DateConverter()) | |||
|
|||
|
|||
def to_arrow_type(dt): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All moved to pyspark.sql.pandas.types
c89e5d5
to
e2a3bc0
Compare
cc @cloud-fan and @viirya for better visibility - it's also related with pandas revisit one |
e2a3bc0
to
cfd7839
Compare
This comment has been minimized.
This comment has been minimized.
|
||
__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"] | ||
|
||
|
||
class DataFrame(object): | ||
class DataFrame(PandasMapOpsMixin, PandasConversionMixin): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why use mixins here? Since these are never used outside this context nor (as far as I can tell) mimic the Scala counterpart, the approach seems a bit counterintuitive.
Wouldn't make more sense to use proper instances, same way as DataFrameNaFunctions
or DataFrameStatFunctions
?
This question also applies to GroupedData
and its mxin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah you mean API usages like:
df.pandas.mapInPandas(...)
via defining an instance like DataFrameNaFunctions
or DataFrameStatFunctions
? I actually considered this approach first. If I could write it from the first place, I think I could consider this option without concerning about backward compatibility.
If you mean defining DataFrameNaFunctions
or DataFrameStatFunctions
and calling directly at API calls of DataFrame
or SparkSession
, the API names have to be exposed at at DataFrame
or SparkSession
level .. which is ugly.
Also, Scala guys are pretty used to mixins. If we think self type trait, I think it fits to the purpose.
Lastly (trivial but still), I wanted to make the codes a bit more pandas friendly by mimicking in case we can have some more pandas dev guys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I am trying to get a better feeling of overall purpose of such refactoring.
As for now there is no indication that any of these mixins will be ever used outside the current context (DataFrame
and GroupedData
). That impression is further enforced by explicit type checks (here and here). So that doesn't really seem like a canonical use of mixin, especially when base core DataFrame
is not designed for extensiblity.
Ah you mean API usages like:
df.pandas.mapInPandas(...)
That's one possible approach though not the one I was thinking about. I assumed (though I am not sure, as the amount of code moved, excluding docs, message and some static stuff is negligible, and tightly coupled with DataFrame
anyway) that the point is maintainability.
So possible approach is either direct
def __init__(self, ...):
...
self._pandasMapOpsMixin = PandasMapOpsMixin(self)
...
def mapInPandas(self, udf):
return self._pandasMapOpsMixin.mapInPandas(udf)
or indirect (by overwriting __geattr__
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah,
def __init__(self, ...):
...
self._pandasMapOpsMixin = PandasMapOpsMixin(self)
...
def mapInPandas(self, udf):
return self._pandasMapOpsMixin.mapInPandas(udf)
This one, the DataFrame
or SparkSession
have to expose the method themselves in their own classes. So, if you want to add some new pandas APIs, then it should be added into PandasMapOpsMixin
and then also DataFrame
. Using __getattr__
looks to me overkill and a bit odd.
The point I wanted to say is, the current DataFrame
with mixin approaches is closer to the Scala side's Dataset
since it does not expose such APIs.
Also, I believe what I am doing is what self type trait is supposed to be doing. It's coupled to specific type and other types cannot implement this trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant:
class PandasMapOps(object):
def mapInPandas(self, ...):
...
return ...
# other Pandas <> PySpark APIs
class DataFrame(object):
def __init__(self, ...):
...
self.pandas_map_ops = PandasMapOps(self)
# other DataFrame APIs equivalent to Scala side.
def mapInPandas(self, ...):
return pandas_map_ops.mapInPandas(...)
vs
class PandasMapOpsMixin(object):
def mapInPandas(self, ...):
...
return ...
# other Pandas <> PySpark APIs
class DataFrame(PandasMapOpsMixin):
# other DataFrame APIs equivalent to Scala side.
I thought the latter is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both are fine. It's internal so we can change it later.
It is hardly internal, considering that as mixed classes are "public" (as much as it is meaningful to say about access control in Python) and, in contrast to other changes proposed here, all shifted non-static methods are part of external API. The impact so far is small though, if that's what you mean.
Also, I believe what I am doing is what self type trait is supposed to be doing. It's coupled to specific type and other types cannot implement this trait.
Such patterns, or its closes Python equivalents (see for example Django mixins for class-based views) typically indicate two things:
- Potential for inheritance, which is clearly not the case, given both Spark API and design of the
DataFrame
class. - Non-obligatory character, which once again is not the case.
So I guess the question I am trying to ask is - "what future planned changes justify such move" - as for now it seems mostly obsolete, and less effort path, given implied time pressure, would be to keep things as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hardly internal, considering that as mixed classes are "public" (as much as it is meaningful to say about access control in Python) and, in contrast to other changes proposed here, all shifted non-static methods are part of external API. The impact so far is small though, if that's what you mean.
Either way the same APIs are exposed in the same class. It will be also easily able to switch to each other with minimised change - I actually roughly already tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way the same APIs are exposed in the same class. It's also easily switch to each other with minimised change.
While it is true, it is not equivalent from perspective of type checkers. And since a lot of related discussions circles around typing, it might be something to consider.
Nonetheless, I can take a hint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I am trying to do is basically separate pandas-related APIs, and Scala-equivalent APIs.
So, if you want to make some changes in pandas-related APIs, you make changes in PandasMapOpsMixin
.
If you need to make some changes in Scala-equivalent APIs, you make changes at DataFrame
.
Both are completely separate in the current way. Given the suggestion, it will mix up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on @HyukjinKwon 's suggestion.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Retest this please. |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
ec66cbd
to
2eaeb87
Compare
This comment has been minimized.
This comment has been minimized.
2eaeb87
to
3354d90
Compare
This comment has been minimized.
This comment has been minimized.
Test build #116240 has finished for PR 27109 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, having all the Pandas functionality in one place will make things easier
return pdf | ||
|
||
@staticmethod | ||
def _to_corrected_pandas_type(dt): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I piggyback a nit fix - I made this as a staticmethod. It was a function at module.
else: | ||
return None | ||
|
||
def _collect_as_arrow(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I piggyback a nit fix. It was _collectAsArrow
. Unless it's to match with Scala side, it has to follow snake_naming per PEP 8.
""" | ||
from pyspark.sql.dataframe import DataFrame | ||
|
||
assert isinstance(self, DataFrame) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, this assert was added. IDE can detect and know self
is DataFrame
.
@@ -179,6 +179,8 @@ def _supports_symlinks(): | |||
'pyspark.ml.linalg', | |||
'pyspark.ml.param', | |||
'pyspark.sql', | |||
'pyspark.sql.avro', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I piggyback another fix by adding pyspark.sql.avro
. See https://github.com/apache/spark/pull/27109/files#r364017167
@@ -7,6 +7,7 @@ Module Context | |||
.. automodule:: pyspark.sql | |||
:members: | |||
:undoc-members: | |||
:inherited-members: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked every API to see if it affects API documentation. It only affects Row
which inherits tuple
. It adds two APIs inherited from tuple where it seems legitimate.
globs['spark'] = spark | ||
(failure_count, test_count) = doctest.testmod( | ||
pyspark.sql.pandas.conversion, globs=globs, | ||
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I piggyback another change here - doctest.REPORT_NDIFF
was added. Some files have it and some files don't. I decided to consistently add.
@@ -362,6 +362,13 @@ def __hash__(self): | |||
"pyspark.sql.udf", | |||
"pyspark.sql.window", | |||
"pyspark.sql.avro.functions", | |||
"pyspark.sql.pandas.conversion", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW @BryanCutler, @icexelloss, @ueshin, please feel free to change package/module/class names separately if you don't like it. It won't likely cause no or less conflicts with the works I'm doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good as I also sometimes get lost in finding pandas related stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Thanks all. I will merge this tomorrow if I don't see any major concerns. |
Merged to master. |
Min-in for the conversion from pandas to Spark. Currently, only :class:`SparkSession` | ||
can use this class. | ||
""" | ||
def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of shading SparkSession.createDataFrame
and later having to disambiguate it would be more idiomatic to just name mangle like this
def __createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
return self._SparkConversionMixin__createDataFrame(
It would be also easier to add new mixins later, if that proves to be useful for some reason (here is possible diff).
Additionally we could limit the responsibility to data and schema preparation only (diff for this). Additionally to benefits mentioned above, it allow us to decouple mix in from SparkSession
(as we'd only need _wrapped._conf
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did the current way so that SparkConversionMixin.createDataFrame
can work alone by itself. I didn't like the second way because it's weird to return both schema
and data
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_SparkConversionMixin__createDataFrame
looks fine in a way that it can distinguish which mixin is used but we don't really use mangle name yet in the current codebase too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did the current way so that SparkConversionMixin.createDataFrame can work alone by itself
But I cannot, can it? It still has to be mixed-in into something that has these methods.
I didn't like the second way because it's weird to return both schema and data.
Matter of taste I guess.
_SparkConversionMixin__createDataFrame looks fine in a way that it can distinguish which mixin is used but we don't really use mangle name yet in the current codebase too.
That's true, though I am not aware of case might occur. We use mixins for Params
, but there conflicts are either unlikely or irrelevant. Here both methods are used, and it is theoretically possible that other similar might follow. Let's say pypark.sql.ray.conversions
or ``pypark.sql.dask.conversions`, either here, or in the 3rd party extensions. In such cases we might prefer to have consistent interface.
Anyway, I don't have very strong opinion about this. I've mentioned this primarily because corresponding type hints looked funky.
There is however one immediate side effect of the current approach ‒ it makes passing methods values a bit unreliable (and by extension methods that depend on it, like monkey patching).
…ent in createDataFrame ### What changes were proposed in this pull request? This is a followup of #27109. It should match the parameter lists in `createDataFrame`. ### Why are the changes needed? To pass parameters supposed to pass. ### Does this PR introduce any user-facing change? No (it's only in master) ### How was this patch tested? Manually tested and existing tests should cover. Closes #27225 from HyukjinKwon/SPARK-30434-followup. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
This PR proposes to move pandas related functionalities into pandas package. Namely:
In order to separately locate
groupby.apply
,groupby.cogroup.apply
,mapInPandas
,toPandas
, andcreateDataFrame(pdf)
underpandas
sub-package, I had to use a mix-in approach which Scala side uses often bytrait
, and also pandas itself uses this approach (seeIndexOpsMixin
as an example) to group related functionalities. Currently, you can think it's like Scala's self typed trait. See the structure below:Yes, This is a big PR but they are mostly just moving around except one case
createDataFrame
which I had to split the methods.Why are the changes needed?
There are pandas functionalities here and there and I myself gets lost where it was. Also, when you have to make a change commonly for all of pandas related features, it's almost impossible now.
Also, after this change,
DataFrame
andSparkSession
become more consistent with Scala side since pandas is specific to Python, and this change separates pandas-specific APIs away fromDataFrame
orSparkSession
.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests should cover. Also, I manually built the PySpark API documentation and checked.