Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 615 map partitions with index callable from java #16

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Feb 27, 2014

No description provided.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12894/

@mateiz
Copy link
Contributor

mateiz commented Mar 4, 2014

Hey Holden, wait on this a bit until #17 is merged. Then we'll also want to make sure it works with Java 8 (you'll need to make the class an interface and such).

@pwendell
Copy link
Contributor

pwendell commented Mar 8, 2014

@holdenk mind bumping this now that #17 is in? You'll have to change extends to with... since the function classes are now interfaces rather than abstract classes.

@holdenk
Copy link
Contributor Author

holdenk commented Mar 8, 2014

Sure, I'll give this a shot today :)

On Sat, Mar 8, 2014 at 11:24 AM, Patrick Wendell
[email protected]:

@holdenk https://github.com/holdenk mind bumping this now that #17https://github.com/apache/spark/pull/17is in? You'll have to change
extends to with... since the function classes are now interfaces rather
than abstract classes.

Reply to this email directly or view it on GitHubhttps://github.com//pull/16#issuecomment-37107006
.

Cell : 425-233-8271

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13075/

@JoshRosen
Copy link
Contributor

Sorry to necro the oldest open PR, but do you mind closing this now that mapPartitionsWithIndex has been fixed? Thanks!

@holdenk holdenk closed this Aug 24, 2014
jackylk pushed a commit to jackylk/spark that referenced this pull request Nov 8, 2014
JasonMWhite pushed a commit to JasonMWhite/spark that referenced this pull request Dec 2, 2015
Fix java.util.MissingFormatArgumentException in statsd module
asfgit pushed a commit that referenced this pull request Mar 29, 2016
## What changes were proposed in this pull request?

This PR brings the support for chained Python UDFs, for example

```sql
select udf1(udf2(a))
select udf1(udf2(a) + 3)
select udf1(udf2(a) + udf3(b))
```

Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.

For example,
```python
>>> sqlContext.sql("select double(double(1))").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [pythonUDF#10 AS double(double(1))#9]
:     +- INPUT
+- !BatchPythonEvaluation double(double(1)), [pythonUDF#10]
   +- Scan OneRowRelation[]
>>> sqlContext.sql("select double(double(1) + double(2))").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16]
:     +- INPUT
+- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19]
   +- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18]
      +- !BatchPythonEvaluation double(1), [pythonUDF#17]
         +- Scan OneRowRelation[]
```

TODO: will support multiple unrelated Python UDFs in one batch (another PR).

## How was this patch tested?

Added new unit tests for chained UDFs.

Author: Davies Liu <[email protected]>

Closes #12014 from davies/py_udfs.
Parth-Brahmbhatt pushed a commit to Parth-Brahmbhatt/spark that referenced this pull request Jul 25, 2016
…-1656 to netflix/1.6.1

* commit '5b54d2fbb11b45298440d77deb06514f12c47b40':
  [DSEPLAT-1656] Upgrade the version of metacat client, benjamin and bdurl.
AnthonyTruchet added a commit to AnthonyTruchet/spark that referenced this pull request Dec 12, 2016
Fix dev tools and add some new, Criteo specific ones.
lins05 pushed a commit to lins05/spark that referenced this pull request Jan 17, 2017
* Documentation for the current state of the world.

* Adding navigation links from other pages

* Address comments, add TODO for things that should be fixed

* Address comments, mostly making images section clearer

* Virtual runtime -> container runtime
lins05 pushed a commit to lins05/spark that referenced this pull request Apr 23, 2017
* Documentation for the current state of the world.

* Adding navigation links from other pages

* Address comments, add TODO for things that should be fixed

* Address comments, mostly making images section clearer

* Virtual runtime -> container runtime
icexelloss added a commit to icexelloss/spark that referenced this pull request Apr 28, 2017
Move column writers to Arrow.scala

Add support for more types; Switch to arrow NullableVector

closes apache#16
erikerlandson pushed a commit to erikerlandson/spark that referenced this pull request Jul 28, 2017
* Documentation for the current state of the world.

* Adding navigation links from other pages

* Address comments, add TODO for things that should be fixed

* Address comments, mostly making images section clearer

* Virtual runtime -> container runtime
sven0726 pushed a commit to sven0726/spark that referenced this pull request Dec 3, 2018
hn5092 added a commit to hn5092/spark that referenced this pull request Apr 25, 2019
 upgrade spark version to 2.4.1-kylin-r5
hn5092 added a commit to hn5092/spark that referenced this pull request Jul 17, 2019
 upgrade spark version to 2.4.1-kylin-r5
hn5092 added a commit to hn5092/spark that referenced this pull request Jul 18, 2019
SirOibaf added a commit to SirOibaf/spark that referenced this pull request Jun 11, 2020
ringtail added a commit to ringtail/spark that referenced this pull request Jan 21, 2021
redsanket pushed a commit to redsanket/spark that referenced this pull request Feb 16, 2021
[YSPARK-1523] Cleanup hbaseread.py
HyukjinKwon pushed a commit that referenced this pull request Apr 22, 2023
…onnect

### What changes were proposed in this pull request?
Implement Arrow-optimized Python UDFs in Spark Connect.

Please see #39384 for motivation and  performance improvements of Arrow-optimized Python UDFs.

### Why are the changes needed?
Parity with vanilla PySpark.

### Does this PR introduce _any_ user-facing change?
Yes. In Spark Connect Python Client, users can:

1. Set `useArrow` parameter True to enable Arrow optimization for a specific Python UDF.

```sh
>>> df = spark.range(2)
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).show()
+------------+
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#18 AS <lambda>(id)#16]
+- ArrowEvalPython [<lambda>(id#14L)#15], [pythonUDF0#18], 200
   +- *(1) Range (0, 2, step=1, splits=1)
```

2. Enable `spark.sql.execution.pythonUDF.arrow.enabled` Spark Conf to make all Python UDFs Arrow-optimized.

```sh
>>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
>>> df.select(udf(lambda x : x + 1)('id')).show()
+------------+
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#30 AS <lambda>(id)#28]
+- ArrowEvalPython [<lambda>(id#26L)#27], [pythonUDF0#30], 200
   +- *(1) Range (0, 2, step=1, splits=1)

```

### How was this patch tested?
Parity unit tests.

Closes #40725 from xinrong-meng/connect_arrow_py_udf.

Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
risyomei pushed a commit to risyomei/spark that referenced this pull request Jun 26, 2023
srowen pushed a commit that referenced this pull request Jan 28, 2025
This is a trivial change to replace the loop index from `int` to `long`. Surprisingly, microbenchmark shows more than double performance uplift.

Analysis
--------
The hot loop of `arrayEquals` method is simplifed as below. Loop index `i` is defined as `int`, it's compared with `length`, which is a `long`, to determine if the loop should end.
```
public static boolean arrayEquals(
    Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
  ......
  int i = 0;
  while (i <= length - 8) {
    if (Platform.getLong(leftBase, leftOffset + i) !=
        Platform.getLong(rightBase, rightOffset + i)) {
          return false;
    }
    i += 8;
  }
  ......
}
```

Strictly speaking, there's a code bug here. If `length` is greater than 2^31 + 8, this loop will never end because `i` as a 32 bit integer is at most 2^31 - 1. But compiler must consider this behaviour as intentional and generate code strictly match the logic. It prevents compiler from generating optimal code.

Defining loop index `i` as `long` corrects this issue. Besides more accurate code logic, JIT is able to optimize this code much more aggressively. From microbenchmark, this trivial change improves performance significantly on both Arm and x86 platforms.

Benchmark
---------
Source code:
https://gist.github.com/cyb70289/258e261f388e22f47e4d961431786d1a

Result on Arm Neoverse N2:
```
Benchmark                             Mode  Cnt    Score   Error  Units
ArrayEqualsBenchmark.arrayEqualsInt   avgt   10  674.313 ± 0.213  ns/op
ArrayEqualsBenchmark.arrayEqualsLong  avgt   10  313.563 ± 2.338  ns/op
```

Result on Intel Cascake Lake:
```
Benchmark                             Mode  Cnt     Score   Error  Units
ArrayEqualsBenchmark.arrayEqualsInt   avgt   10  1130.695 ± 0.168  ns/op
ArrayEqualsBenchmark.arrayEqualsLong  avgt   10   461.979 ± 0.097  ns/op
```

Deep dive
---------
Dive deep to the machine code level, we can see why the big gap. Listed below are arm64 assembly generated by Openjdk-17 C2 compiler.

For `int i`, the machine code is similar to source code, no deep optimization. Safepoint polling is expensive in this short loop.
```
// jit c2 machine code snippet
  0x0000ffff81ba8904:   mov        w15, wzr              // int i = 0
  0x0000ffff81ba8908:   nop
  0x0000ffff81ba890c:   nop
loop:
  0x0000ffff81ba8910:   ldr        x10, [x13, w15, sxtw] // Platform.getLong(leftBase, leftOffset + i)
  0x0000ffff81ba8914:   ldr        x14, [x12, w15, sxtw] // Platform.getLong(rightBase, rightOffset + i)
  0x0000ffff81ba8918:   cmp        x10, x14
  0x0000ffff81ba891c:   b.ne       0x0000ffff81ba899c    // return false if not equal
  0x0000ffff81ba8920:   ldr        x14, [x28, #848]      // x14 -> safepoint
  0x0000ffff81ba8924:   add        w15, w15, #0x8        // i += 8
  0x0000ffff81ba8928:   ldr        wzr, [x14]            // safepoint polling
  0x0000ffff81ba892c:   sxtw       x10, w15              // extend i to long
  0x0000ffff81ba8930:   cmp        x10, x11
  0x0000ffff81ba8934:   b.le       0x0000ffff81ba8910    // if (i <= length - 8) goto loop
```

For `long i`, JIT is able to do much more aggressive optimization. E.g, below code snippet unrolls the loop by four.
```
// jit c2 machine code snippet
unrolled_loop:
  0x0000ffff91de6fe0:   sxtw       x10, w7
  0x0000ffff91de6fe4:   add        x23, x22, x10
  0x0000ffff91de6fe8:   add        x24, x21, x10
  0x0000ffff91de6fec:   ldr        x13, [x23]          // unroll-1
  0x0000ffff91de6ff0:   ldr        x14, [x24]
  0x0000ffff91de6ff4:   cmp        x13, x14
  0x0000ffff91de6ff8:   b.ne       0x0000ffff91de70a8
  0x0000ffff91de6ffc:   ldr        x13, [x23, #8]      // unroll-2
  0x0000ffff91de7000:   ldr        x14, [x24, #8]
  0x0000ffff91de7004:   cmp        x13, x14
  0x0000ffff91de7008:   b.ne       0x0000ffff91de70b4
  0x0000ffff91de700c:   ldr        x13, [x23, #16]     // unroll-3
  0x0000ffff91de7010:   ldr        x14, [x24, #16]
  0x0000ffff91de7014:   cmp        x13, x14
  0x0000ffff91de7018:   b.ne       0x0000ffff91de70a4
  0x0000ffff91de701c:   ldr        x13, [x23, #24]     // unroll-4
  0x0000ffff91de7020:   ldr        x14, [x24, #24]
  0x0000ffff91de7024:   cmp        x13, x14
  0x0000ffff91de7028:   b.ne       0x0000ffff91de70b0
  0x0000ffff91de702c:   add        w7, w7, #0x20
  0x0000ffff91de7030:   cmp        w7, w11
  0x0000ffff91de7034:   b.lt       0x0000ffff91de6fe0
```

### What changes were proposed in this pull request?

A trivial change to replace loop index `i` of method `arrayEquals` from `int` to `long`.

### Why are the changes needed?

To improve performance and fix a possible bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49568 from cyb70289/arrayEquals.

Authored-by: Yibo Cai <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants