Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30434][PYTHON][SQL] Move pandas related functionalities into 'pandas' sub-package #27109

Closed
wants to merge 2 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jan 6, 2020

What changes were proposed in this pull request?

This PR proposes to move pandas related functionalities into pandas package. Namely:

pyspark/sql/pandas
├── __init__.py
├── conversion.py  # Conversion between pandas <> PySpark DataFrames
├── functions.py   # pandas_udf
├── group_ops.py   # Grouped UDF / Cogrouped UDF + groupby.apply, groupby.cogroup.apply
├── map_ops.py     # Map Iter UDF + mapInPandas
├── serializers.py # pandas <> PyArrow serializers
├── types.py       # Type utils between pandas <> PyArrow
└── utils.py       # Version requirement checks

In order to separately locate groupby.apply, groupby.cogroup.apply, mapInPandas, toPandas, and createDataFrame(pdf) under pandas sub-package, I had to use a mix-in approach which Scala side uses often by trait, and also pandas itself uses this approach (see IndexOpsMixin as an example) to group related functionalities. Currently, you can think it's like Scala's self typed trait. See the structure below:

class PandasMapOpsMixin(object):
    def mapInPandas(self, ...):
        ...
        return ...

    # other Pandas <> PySpark APIs
class DataFrame(PandasMapOpsMixin):

    # other DataFrame APIs equivalent to Scala side.

Yes, This is a big PR but they are mostly just moving around except one case createDataFrame which I had to split the methods.

Why are the changes needed?

There are pandas functionalities here and there and I myself gets lost where it was. Also, when you have to make a change commonly for all of pandas related features, it's almost impossible now.

Also, after this change, DataFrame and SparkSession become more consistent with Scala side since pandas is specific to Python, and this change separates pandas-specific APIs away from DataFrame or SparkSession.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests should cover. Also, I manually built the PySpark API documentation and checked.

@HyukjinKwon
Copy link
Member Author

@BryanCutler and @ueshin, I think we talked about this once before .. this PR targets to move everything related to pandas and pyarrow to one package pandas. Mostly it's just moving around the codes although the changes are big.

@@ -185,248 +185,6 @@ def loads(self, obj):
raise NotImplementedError


class ArrowCollectSerializer(Serializer):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All classes are moved as are to pyspark.sql.pandas.serializers

@@ -2135,193 +2135,6 @@ def transform(self, func):
"should have been DataFrame." % type(result)
return result

@since(1.3)
def toPandas(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to PandasConversionMixin under pyspark.sql.pandas.conversion

_check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
return pdf

def mapInPandas(self, udf):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to PandasMapOpsMixin under pyspark.sql.pandas.map_ops.

@@ -2814,22 +2816,6 @@ def from_csv(col, schema, options={}):

# ---------------------------- User Defined Function ----------------------------------

class PandasUDFType(object):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to pyspark.sql.pandas.functions

@@ -219,68 +219,6 @@ def pivot(self, pivot_col, values=None):
jgd = self._jgd.pivot(pivot_col, values)
return GroupedData(jgd, self._df)

@since(3.0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to PandasGroupedOpsMixin under pyspark.sql.pandas.group_ops

@SparkQA

This comment has been minimized.

@@ -458,135 +459,6 @@ def _createFromLocal(self, data, schema):
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema

def _get_numpy_record_dtype(self, rec):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All moved to SparkConversionMixin under pyspark.sql.pandas.conversion

@@ -722,46 +594,12 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
except Exception:
has_pandas = False
if has_pandas and isinstance(data, pandas.DataFrame):
from pyspark.sql.utils import require_minimum_pandas_version
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one needs to be reviewed - I had to split the methods.

@@ -1608,267 +1608,6 @@ def convert(self, obj, gateway_client):
register_input_converter(DateConverter())


def to_arrow_type(dt):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All moved to pyspark.sql.pandas.types

@HyukjinKwon
Copy link
Member Author

cc @cloud-fan and @viirya for better visibility - it's also related with pandas revisit one

@HyukjinKwon HyukjinKwon changed the title [SPARK-30434][PYTHON][SQL] Move pandas related functionalities into pandas sub-package [SPARK-30434][PYTHON][SQL] Move pandas related functionalities into 'pandas' sub-package Jan 6, 2020
@SparkQA

This comment has been minimized.


__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]


class DataFrame(object):
class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why use mixins here? Since these are never used outside this context nor (as far as I can tell) mimic the Scala counterpart, the approach seems a bit counterintuitive.

Wouldn't make more sense to use proper instances, same way as DataFrameNaFunctions or DataFrameStatFunctions?

This question also applies to GroupedData and its mxin.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you mean API usages like:

df.pandas.mapInPandas(...)

via defining an instance like DataFrameNaFunctions or DataFrameStatFunctions? I actually considered this approach first. If I could write it from the first place, I think I could consider this option without concerning about backward compatibility.

If you mean defining DataFrameNaFunctions or DataFrameStatFunctions and calling directly at API calls of DataFrame or SparkSession, the API names have to be exposed at at DataFrame or SparkSession level .. which is ugly.

Also, Scala guys are pretty used to mixins. If we think self type trait, I think it fits to the purpose.

Lastly (trivial but still), I wanted to make the codes a bit more pandas friendly by mimicking in case we can have some more pandas dev guys.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I am trying to get a better feeling of overall purpose of such refactoring.

As for now there is no indication that any of these mixins will be ever used outside the current context (DataFrame and GroupedData). That impression is further enforced by explicit type checks (here and here). So that doesn't really seem like a canonical use of mixin, especially when base core DataFrame is not designed for extensiblity.

Ah you mean API usages like:

df.pandas.mapInPandas(...)

That's one possible approach though not the one I was thinking about. I assumed (though I am not sure, as the amount of code moved, excluding docs, message and some static stuff is negligible, and tightly coupled with DataFrame anyway) that the point is maintainability.

So possible approach is either direct

    def __init__(self, ...):
        ...
        self._pandasMapOpsMixin = PandasMapOpsMixin(self)
        ...

    def mapInPandas(self, udf):
        return self._pandasMapOpsMixin.mapInPandas(udf)

or indirect (by overwriting __geattr__).

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah,

    def __init__(self, ...):
        ...
        self._pandasMapOpsMixin = PandasMapOpsMixin(self)
        ...

    def mapInPandas(self, udf):
        return self._pandasMapOpsMixin.mapInPandas(udf)

This one, the DataFrame or SparkSession have to expose the method themselves in their own classes. So, if you want to add some new pandas APIs, then it should be added into PandasMapOpsMixin and then also DataFrame. Using __getattr__ looks to me overkill and a bit odd.

The point I wanted to say is, the current DataFrame with mixin approaches is closer to the Scala side's Dataset since it does not expose such APIs.

Also, I believe what I am doing is what self type trait is supposed to be doing. It's coupled to specific type and other types cannot implement this trait.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant:

class PandasMapOps(object):
    def mapInPandas(self, ...):
        ...
        return ...

    # other Pandas <> PySpark APIs
class DataFrame(object):
    def __init__(self, ...):
        ...
        self.pandas_map_ops = PandasMapOps(self)

    # other DataFrame APIs equivalent to Scala side.

    def mapInPandas(self, ...):
        return pandas_map_ops.mapInPandas(...)

vs

class PandasMapOpsMixin(object):
    def mapInPandas(self, ...):
        ...
        return ...

    # other Pandas <> PySpark APIs
class DataFrame(PandasMapOpsMixin):

    # other DataFrame APIs equivalent to Scala side.

I thought the latter is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan

I think both are fine. It's internal so we can change it later.

It is hardly internal, considering that as mixed classes are "public" (as much as it is meaningful to say about access control in Python) and, in contrast to other changes proposed here, all shifted non-static methods are part of external API. The impact so far is small though, if that's what you mean.

@HyukjinKwon

Also, I believe what I am doing is what self type trait is supposed to be doing. It's coupled to specific type and other types cannot implement this trait.

Such patterns, or its closes Python equivalents (see for example Django mixins for class-based views) typically indicate two things:

  • Potential for inheritance, which is clearly not the case, given both Spark API and design of the DataFrame class.
  • Non-obligatory character, which once again is not the case.

So I guess the question I am trying to ask is - "what future planned changes justify such move" - as for now it seems mostly obsolete, and less effort path, given implied time pressure, would be to keep things as-is.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hardly internal, considering that as mixed classes are "public" (as much as it is meaningful to say about access control in Python) and, in contrast to other changes proposed here, all shifted non-static methods are part of external API. The impact so far is small though, if that's what you mean.

Either way the same APIs are exposed in the same class. It will be also easily able to switch to each other with minimised change - I actually roughly already tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way the same APIs are exposed in the same class. It's also easily switch to each other with minimised change.

While it is true, it is not equivalent from perspective of type checkers. And since a lot of related discussions circles around typing, it might be something to consider.

Nonetheless, I can take a hint.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am trying to do is basically separate pandas-related APIs, and Scala-equivalent APIs.

So, if you want to make some changes in pandas-related APIs, you make changes in PandasMapOpsMixin.
If you need to make some changes in Scala-equivalent APIs, you make changes at DataFrame.
Both are completely separate in the current way. Given the suggestion, it will mix up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on @HyukjinKwon 's suggestion.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116240 has finished for PR 27109 at commit c63c271.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, having all the Pandas functionality in one place will make things easier

return pdf

@staticmethod
def _to_corrected_pandas_type(dt):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I piggyback a nit fix - I made this as a staticmethod. It was a function at module.

else:
return None

def _collect_as_arrow(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I piggyback a nit fix. It was _collectAsArrow. Unless it's to match with Scala side, it has to follow snake_naming per PEP 8.

"""
from pyspark.sql.dataframe import DataFrame

assert isinstance(self, DataFrame)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, this assert was added. IDE can detect and know self is DataFrame.

@@ -179,6 +179,8 @@ def _supports_symlinks():
'pyspark.ml.linalg',
'pyspark.ml.param',
'pyspark.sql',
'pyspark.sql.avro',
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I piggyback another fix by adding pyspark.sql.avro. See https://github.com/apache/spark/pull/27109/files#r364017167

@@ -7,6 +7,7 @@ Module Context
.. automodule:: pyspark.sql
:members:
:undoc-members:
:inherited-members:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked every API to see if it affects API documentation. It only affects Row which inherits tuple. It adds two APIs inherited from tuple where it seems legitimate.

globs['spark'] = spark
(failure_count, test_count) = doctest.testmod(
pyspark.sql.pandas.conversion, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I piggyback another change here - doctest.REPORT_NDIFF was added. Some files have it and some files don't. I decided to consistently add.

@@ -362,6 +362,13 @@ def __hash__(self):
"pyspark.sql.udf",
"pyspark.sql.window",
"pyspark.sql.avro.functions",
"pyspark.sql.pandas.conversion",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW @BryanCutler, @icexelloss, @ueshin, please feel free to change package/module/class names separately if you don't like it. It won't likely cause no or less conflicts with the works I'm doing.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good as I also sometimes get lost in finding pandas related stuff.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@HyukjinKwon
Copy link
Member Author

Thanks all. I will merge this tomorrow if I don't see any major concerns.

@HyukjinKwon
Copy link
Member Author

Merged to master.

Min-in for the conversion from pandas to Spark. Currently, only :class:`SparkSession`
can use this class.
"""
def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of shading SparkSession.createDataFrame and later having to disambiguate it would be more idiomatic to just name mangle like this

def __createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):

and later

return self._SparkConversionMixin__createDataFrame(

It would be also easier to add new mixins later, if that proves to be useful for some reason (here is possible diff).

Additionally we could limit the responsibility to data and schema preparation only (diff for this). Additionally to benefits mentioned above, it allow us to decouple mix in from SparkSession (as we'd only need _wrapped._conf).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the current way so that SparkConversionMixin.createDataFrame can work alone by itself. I didn't like the second way because it's weird to return both schema and data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_SparkConversionMixin__createDataFrame looks fine in a way that it can distinguish which mixin is used but we don't really use mangle name yet in the current codebase too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the current way so that SparkConversionMixin.createDataFrame can work alone by itself

But I cannot, can it? It still has to be mixed-in into something that has these methods.

I didn't like the second way because it's weird to return both schema and data.

Matter of taste I guess.

_SparkConversionMixin__createDataFrame looks fine in a way that it can distinguish which mixin is used but we don't really use mangle name yet in the current codebase too.

That's true, though I am not aware of case might occur. We use mixins for Params, but there conflicts are either unlikely or irrelevant. Here both methods are used, and it is theoretically possible that other similar might follow. Let's say pypark.sql.ray.conversions or ``pypark.sql.dask.conversions`, either here, or in the 3rd party extensions. In such cases we might prefer to have consistent interface.

Anyway, I don't have very strong opinion about this. I've mentioned this primarily because corresponding type hints looked funky.

There is however one immediate side effect of the current approach ‒ it makes passing methods values a bit unreliable (and by extension methods that depend on it, like monkey patching).

HyukjinKwon added a commit that referenced this pull request Jan 16, 2020
…ent in createDataFrame

### What changes were proposed in this pull request?

This is a followup of #27109. It should match the parameter lists in `createDataFrame`.

### Why are the changes needed?

To pass parameters supposed to pass.

### Does this PR introduce any user-facing change?

No (it's only in master)

### How was this patch tested?

Manually tested and existing tests should cover.

Closes #27225 from HyukjinKwon/SPARK-30434-followup.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
@HyukjinKwon HyukjinKwon deleted the pandas-refactoring branch March 3, 2020 01:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants