Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1728] Fix NPE when failing to connect to celeborn worker #2933

Closed
wants to merge 1 commit into from

Conversation

turboFei
Copy link
Member

@turboFei turboFei commented Nov 20, 2024

What changes were proposed in this pull request?

Fix NPE. When failed to connect to celeborn worker, the currentReader might be null.

Why are the changes needed?

I am testing #2921 in the celeborn cluster.

And set the celeborn.data.io.connectionTimeout to 30s for fetch failure testing, and it failed to connect to celeborn worker for 3 times, and then the currentReader was null.

image

https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672

24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 (TID 108550)
java.lang.NullPointerException
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672)
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

GA.

@turboFei turboFei changed the title fix npe [CELEBORN-1728] Fix NPE when failing to connect to celeborn worker Nov 20, 2024
@turboFei turboFei requested review from cxzl25, RexXiong and FMX November 20, 2024 23:51
Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM

Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks. Merged into main(v0.6.0), branch-0.5(v0.5.2).

@FMX FMX closed this in 094fe28 Nov 21, 2024
FMX pushed a commit that referenced this pull request Nov 21, 2024
### What changes were proposed in this pull request?
Fix NPE. When failed to connect to celeborn worker, the currentReader might be `null`.

### Why are the changes needed?

I am testing #2921 in the celeborn cluster.

And set the `celeborn.data.io.connectionTimeout` to 30s for fetch failure testing, and it failed to connect to celeborn worker for 3 times, and then the currentReader was null.

<img width="1700" alt="image" src="https://github.com/user-attachments/assets/9473294d-2cca-4f8b-bc86-ab6f70f04cff">

https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672

```
24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 (TID 108550)
java.lang.NullPointerException
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672)
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
GA.

Closes #2933 from turboFei/npe_reader.

Authored-by: Wang, Fei <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 094fe28)
Signed-off-by: mingji <[email protected]>
@turboFei turboFei deleted the npe_reader branch November 21, 2024 08:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants