Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30681][PYSPARK][SQL] Add higher order functions API to PySpark #27406

Closed
wants to merge 11 commits into from

Conversation

zero323
Copy link
Member

@zero323 zero323 commented Jan 30, 2020

What changes were proposed in this pull request?

This PR add Python API for invoking following higher functions:

  • transform
  • exists
  • forall
  • filter
  • aggregate
  • zip_with
  • transform_keys
  • transform_values
  • map_filter
  • map_zip_with

to pyspark.sql. Each of these accepts plain Python functions of one of the following types

  • (Column) -> Column: ...
  • (Column, Column) -> Column: ...
  • (Column, Column, Column) -> Column: ...

Internally this proposal piggbacks on objects supporting Scala implementation (SPARK-27297) by:

  1. Creating required UnresolvedNamedLambdaVariables exposing these as PySpark Columns
  2. Invoking Python function with these columns as arguments.
  3. Using the result, and underlying JVM objects from 1., to create expressions.LambdaFunction which is passed to desired expression, and repacked as Python Column.

Why are the changes needed?

Currently higher order functions are available only using SQL and Scala API and can use only SQL expressions

df.selectExpr("transform(values, x -> x + 1)")

This works reasonably well for simple functions, but can get really ugly with complex functions (complex functions, casts), resulting objects are somewhat verbose and we don't get any IDE support. Additionally DSL used, though very simple, is not documented.

With changes propose here, above query could be rewritten as:

df.select(transform("values", lambda x: x + 1))

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • For positive cases this PR adds doctest strings covering possible usage patterns.
  • For negative cases (unsupported function types) this PR adds unit tests.

Notes

If approved, the same approach can be used in SparkR.

@nchammas
Copy link
Contributor

df.selectExpr("transform(values, x -> x + 1)")

So with this PR, this can be rewritten as follows, correct?

from pyspark.sql.functions import transform

df.select(transform('values', lambda x: x + 1))

Does this PR introduce any user-facing change?

No.

Since this PR adds new public PySpark APIs, shouldn't this be "yes"? Or is that only when modifying an existing public API?

@SparkQA
Copy link

SparkQA commented Jan 30, 2020

Test build #117579 has finished for PR 27406 at commit 7747006.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zero323
Copy link
Member Author

zero323 commented Jan 30, 2020

df.selectExpr("transform(values, x -> x + 1)")

So with this PR, this can be rewritten as follows, correct?

from pyspark.sql.functions import transform

df.select(transform('values', lambda x: x + 1))

That's exactly it. I think it is much easier to write, and while there is some potential for confusion (not really different from Scala variant), it outweighs the benefits.

Does this PR introduce any user-facing change?

No.

Since this PR adds new public PySpark APIs, shouldn't this be "yes"? Or is that only when modifying an existing public API?

Description says:

If yes, please clarify the previous behavior and the change this PR proposes

Since there was no "previous behavior", I've opted for no.

and an use methods of :class:`pyspark.sql.Column`, functions defined in
:py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``.
Python ``UserDefinedFunctions`` are not supported
(`SPARK-27052 <https://issues.apache.org/jira/browse/SPARK-27052>`__).
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SPARK-27052 is won't fix, we could be more proactive and inspect the closure here:

import inspect
from itertools import chain
from pyspark.sql.udf import UserDefinedFunction


def _is_udf(f):
    return isinstance(f, UserDefinedFunction) or (
        hasattr(f, "returnType")
        and hasattr(f, "evalType")
        and f.__closure__
        and isinstance(f.__closure__[0].cell_contents, UserDefinedFunction)
    )


def _check_conatins_udf(f):
    closurevars = inspect.getclosurevars(f)
    for name, f in chain.from_iterable(
        [closurevars.nonlocals.items(), closurevars.globals.items()]
    ):
        if _is_udf(f):

            raise ValueError(
                "Higher order functions cannot use UserDefinedFunctions ",
                "Detected following udf {}: {}".format(name, f),
            )

or throw an exception at the analyzer level

... ("key", "values")
... )
>>> def after_second_quarter(x):
... return month(to_date(x)) > 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: Do we have doc tests, and do they import all the functions for this to work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do. As far as I remember all modules invoke doctests when executed as __main__. He's relevant part of functions.py:

def _test():
import doctest
from pyspark.sql import Row, SparkSession
import pyspark.sql.functions
globs = pyspark.sql.functions.__dict__.copy()
spark = SparkSession.builder\
.master("local[4]")\
.appName("sql.functions tests")\
.getOrCreate()
sc = spark.sparkContext
globs['sc'] = sc
globs['spark'] = spark
globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)])
(failure_count, test_count) = doctest.testmod(
pyspark.sql.functions, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
spark.stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
_test()

@SparkQA
Copy link

SparkQA commented Jan 30, 2020

Test build #117585 has finished for PR 27406 at commit cbbdded.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 30, 2020

Test build #117590 has finished for PR 27406 at commit 95dbda5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Nice, I wanted to have some time to look at this but couldn't. Thanks for working on this.

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117614 has finished for PR 27406 at commit 9993b82.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117655 has finished for PR 27406 at commit ac91cd0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117657 has finished for PR 27406 at commit ba84aeb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117660 has finished for PR 27406 at commit 4e95e12.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2020

Test build #117722 has finished for PR 27406 at commit d3e1287.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2020

Test build #117723 has finished for PR 27406 at commit e60c589.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zero323
Copy link
Member Author

zero323 commented Feb 2, 2020

As a fellow Pythonista, do you have any thoughts about #27245 @nchammas?

return parameters


def _get_lambda_parameters_legacy(f):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, when this targets 3.1, this shouldn't be necessary anymore, though we still have to wait for official resolution of SPARK-29909.

@SparkQA
Copy link

SparkQA commented Feb 4, 2020

Test build #117794 has finished for PR 27406 at commit 05741c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

)


def _get_lambda_parameters(f):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zero323, can we remove _get_lambda_parameters and _get_lambda_parameters_legacy here too for now? Let's discuss and figure out a better way in the next PR about this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but I don't see this.

  • Argument type validation is not covered by planner, and some combinations of argument types will either lead to cryptic exceptions or ambiguous behavior. How would you generate placeholders for example for *args (or similarly ... in R).
  • Having consistent placeholders with Scala counterpart is good. And even to get to x1, x2, ..., cannot be done without inspecting signature. And even if we did only that (which still keeps most of the complexity, calling things(**kwargs)or(*args)` or different keyword variants would be rather sloppy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not particularly against this codes. I just thought it's easier to logically separate two PRs. This PR just to add higher order functions relying on JVM check. The other PR to add argument checking in Python (or R) sides.

For the former case, I am totally supportive and I can sign-off. For the latter I am not completely sure. I would like to collect some feedback from some more people. So .. asking to remove it is just to make it rolling quicker :-).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get you point but right now I consider this core logic. We can shuffle things around and shave a line or two, but I don't see it improving overall "reviewability".

Somewhere here we have to decide out how many placeholders should be provided (if it wasn't for polymorphic behavior of some variants, we could just assume ‒ if we for example had transform and transform_with_index. But now have to do it at least for a few of functions, and there might be more to come). So we can only push things around and

    import inspect

    signature = inspect.signature(f)
    parameters = signature.parameters.values()

as well as the legacy part (I hope it is going to be dropped before 3.1 release anyway) are here to stay (could be inlined, but personally I think it would makes things worse not better).

If we wanted to condense things as much as possible, it could be condensed into something like

    args = [
        _unresolved_named_lambda_variable(f"x{i}") for i, _ in 
        enumerate(
          inspect.signature(f).parameters if sys.version_info >= (3, 3) 
          else inspect.getargspec(f) 
        )
    ]

but I don't think that it makes things any clearer, do you? Also leaves the below.

If we keep just this part and leave variadic argument validation out, I'd be concerned that it will to all kinds of confusing or potentially incorrect behaviors. Users are extremely imaginative and delayed resolution doesn't help*.

And since it is neither heavy in logic nor duplicates (for some definition of duplication) JVM logic, it shouldn't be controversial.


* We can be pretty sure that attempts to apply some-general-purpose-library in func will happen, if this is merged.

And let's be honest ‒ higher order functions are already pretty hard to work with. There are sensitive to minor type mismatches (numeric precision and such) and corresponding expressions are pretty large.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear ‒ my reservations are now purely practical (in contrast to my opposition to removal of function specific validation). Certain things have to be done one way or another, some parts of the logic will be removed soon so it is better to keep these clearly separated and preventing ambiguity while keeping things || with Scala is just basic sanity :)

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine. Let me merge in few days after taking a final look if other committers can't find some time to review.

@HyukjinKwon
Copy link
Member

retest this please

@HyukjinKwon
Copy link
Member

Okay, let's merge this in. I will take a separate look for a followup if it's needed.

@SparkQA
Copy link

SparkQA commented Feb 25, 2020

Test build #118892 has finished for PR 27406 at commit 05741c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

HyukjinKwon pushed a commit that referenced this pull request Feb 28, 2020
…tions

### What changes were proposed in this pull request?

This PR add R API for invoking following higher functions:

- `transform` -> `array_transform` (to avoid conflict with `base::transform`).
- `exists` -> `array_exists` (to avoid conflict with `base::exists`).
- `forall` -> `array_forall` (no conflicts, renamed for consistency)
- `filter` -> `array_filter` (to avoid conflict with `stats::filter`).
- `aggregate` -> `array_aggregate` (to avoid conflict with `stats::transform`).
- `zip_with` -> `arrays_zip_with` (no conflicts, renamed for consistency)
- `transform_keys`
- `transform_values`
- `map_filter`
- `map_zip_with`

Overall implementation follows the same pattern as proposed for PySpark (#27406) and reuses object supporting Scala implementation (SPARK-27297).

### Why are the changes needed?

Currently higher order functions are available only using SQL and Scala API and can use only SQL expressions:

```r
select(df, expr("transform(xs, x -> x + 1)")
```

This is error-prone, and hard to do right, when complex logic is used (`when` / `otherwise`, complex objects).

If this PR is accepted, above function could be simply rewritten as:

```r
select(df, transform("xs", function(x) x + 1))
```

### Does this PR introduce any user-facing change?

No (but new user-facing functions are added).

### How was this patch tested?

Added new unit tests.

Closes #27433 from zero323/SPARK-30682.

Authored-by: zero323 <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
@zero323
Copy link
Member Author

zero323 commented Feb 28, 2020

Thank you @nchammas @HyukjinKwon.

@zero323 zero323 deleted the SPARK-30681 branch February 28, 2020 10:54
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…tions

### What changes were proposed in this pull request?

This PR add R API for invoking following higher functions:

- `transform` -> `array_transform` (to avoid conflict with `base::transform`).
- `exists` -> `array_exists` (to avoid conflict with `base::exists`).
- `forall` -> `array_forall` (no conflicts, renamed for consistency)
- `filter` -> `array_filter` (to avoid conflict with `stats::filter`).
- `aggregate` -> `array_aggregate` (to avoid conflict with `stats::transform`).
- `zip_with` -> `arrays_zip_with` (no conflicts, renamed for consistency)
- `transform_keys`
- `transform_values`
- `map_filter`
- `map_zip_with`

Overall implementation follows the same pattern as proposed for PySpark (apache#27406) and reuses object supporting Scala implementation (SPARK-27297).

### Why are the changes needed?

Currently higher order functions are available only using SQL and Scala API and can use only SQL expressions:

```r
select(df, expr("transform(xs, x -> x + 1)")
```

This is error-prone, and hard to do right, when complex logic is used (`when` / `otherwise`, complex objects).

If this PR is accepted, above function could be simply rewritten as:

```r
select(df, transform("xs", function(x) x + 1))
```

### Does this PR introduce any user-facing change?

No (but new user-facing functions are added).

### How was this patch tested?

Added new unit tests.

Closes apache#27433 from zero323/SPARK-30682.

Authored-by: zero323 <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?

This PR add Python API for invoking following higher functions:

- `transform`
- `exists`
- `forall`
- `filter`
- `aggregate`
- `zip_with`
- `transform_keys`
- `transform_values`
- `map_filter`
- `map_zip_with`

to `pyspark.sql`. Each of these accepts plain Python functions of one of the following types

- `(Column) -> Column: ...`
- `(Column, Column) -> Column: ...`
- `(Column, Column, Column) -> Column: ...`

Internally this proposal piggbacks on objects supporting Scala implementation ([SPARK-27297](https://issues.apache.org/jira/browse/SPARK-27297)) by:

1. Creating  required `UnresolvedNamedLambdaVariables`  exposing these as PySpark `Columns`
2. Invoking Python function with these columns as arguments.
3. Using the result, and underlying JVM objects from 1., to create `expressions.LambdaFunction` which is passed to desired expression, and repacked as Python `Column`.

### Why are the changes needed?

Currently higher order functions are available only using SQL and Scala API and can use only SQL expressions

```python
df.selectExpr("transform(values, x -> x + 1)")
```

This works reasonably well for simple functions, but can get really ugly with complex functions (complex functions, casts), resulting objects are somewhat verbose and we don't get any IDE support.  Additionally DSL used, though  very simple, is not documented.

With changes propose here, above query could be rewritten as:

```python
df.select(transform("values", lambda x: x + 1))
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

- For positive cases this PR adds doctest strings covering possible usage patterns.
- For negative cases (unsupported function types) this PR adds unit tests.

### Notes

If approved, the same approach can be used in SparkR.

Closes apache#27406 from zero323/SPARK-30681.

Authored-by: zero323 <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Jan 6, 2021
…e in higher order functions

### What changes were proposed in this pull request?

This PR is a followup of #27406. It fixes the naming to match with Scala side.

Note that there are a bit of inconsistency already e.g.) `col`, `e`, `expr` and `column`. This part I did not change but other names like `zero` vs `initialValue` or `col1`/`col2` vs `left`/`right` looks unnecessary.

### Why are the changes needed?

To make the usage similar with Scala side, and for consistency.

### Does this PR introduce _any_ user-facing change?

No, this is not released yet.

### How was this patch tested?

GitHub Actions and Jenkins build will test it out.

Closes #31062 from HyukjinKwon/SPARK-30681.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Jan 6, 2021
…e in higher order functions

### What changes were proposed in this pull request?

This PR is a followup of #27406. It fixes the naming to match with Scala side.

Note that there are a bit of inconsistency already e.g.) `col`, `e`, `expr` and `column`. This part I did not change but other names like `zero` vs `initialValue` or `col1`/`col2` vs `left`/`right` looks unnecessary.

### Why are the changes needed?

To make the usage similar with Scala side, and for consistency.

### Does this PR introduce _any_ user-facing change?

No, this is not released yet.

### How was this patch tested?

GitHub Actions and Jenkins build will test it out.

Closes #31062 from HyukjinKwon/SPARK-30681.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ff284fb)
Signed-off-by: HyukjinKwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants