Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace HashSet with ConcurrentHashMap.newKeySet #3100

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

aidar-stripe
Copy link

What changes were proposed in this pull request?

Replacing HashSet of PartitionLocations with concurrent version of it.

Why are the changes needed?

We are seeing some race conditions between handleGetReducerFileGroup& tryFinalCommit, where reducers complete without processing partition, even though there's data.

Problematic logs

On the driver side:

25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 commit files complete. File count 23200 using 240180 ms
...
25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 partition 11931-0: primary lost, use replica PartitionLocation[
  id-epoch:11931-0
  host-rpcPort-pushPort-fetchPort-replicatePort:10.68.138.242-39557-35555-37139-39685
  mode:REPLICA
  peer:(empty)
  storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, filePath=}
  mapIdBitMap:null].
...
25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Succeed to handle stageEnd for 23.

On the executor side:

25/01/31 14:23:02 {executorId=92, jobId=28, partitionId=420, stageId=74, taskAttemptId=82047} INFO org.apache.celeborn.client.ShuffleClientImpl: Shuffle 23 request reducer file group success using 59315 ms, result partition size 12000
...
25/01/31 14:40:54 {executorId=92, partitionId=11931, taskAttemptId=93846} INFO org.apache.spark.executor.Executor: Running task 11931.0 in stage 74.0 (TID 93846)
25/01/31 14:40:54 {jobId=28, executorId=92, taskAttemptId=93846, partitionId=11931, stageId=74} INFO org.apache.spark.shuffle.celeborn.SparkShuffleManager: Shuffle 24 write mode is changed to SORT because partition count 12000 is greater than threshold 2000
25/01/31 14:40:54 {executorId=92, jobId=28, partitionId=11931, stageId=74, taskAttemptId=93846} INFO org.apache.spark.shuffle.celeborn.CelebornShuffleReader: BatchOpenStream for 0 cost 0ms
25/01/31 14:40:54 {} WARN org.apache.celeborn.client.ShuffleClientImpl: Shuffle data is empty for shuffle 23 partition 11931.

How was this patch tested?

No additional tests for this: I've tried to reproduce it, but we've only seen this happen with high number of nodes and during long execution time range.

More explanation on why/how this happens

// write path
 override def setStageEnd(shuffleId: Int): Unit = {
    getReducerFileGroupRequest synchronized {
      stageEndShuffleSet.add(shuffleId)
    }
....

// read path
 override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: Int): Unit = {
    // Quick return for ended stage, avoid occupy sync lock.
    if (isStageEnd(shuffleId)) {
      replyGetReducerFileGroup(context, shuffleId)
    } else {
      getReducerFileGroupRequest.synchronized {
...

override def isStageEnd(shuffleId: Int): Boolean = {
    stageEndShuffleSet.contains(shuffleId)
  }

Since concurrency guarantees between read/write path are based on ConcurrentHashMap's volatile values there's no guarantee that content of a HashSet would be seen fully by the reader thread.

…etween handleGetReducerFileGroup & tryFinalCommit
@FMX
Copy link
Contributor

FMX commented Feb 17, 2025

@aidar-stripe Hi, PR #2986 "[CELEBORN-1769] Fix packed partition location cause GetReducerFileGroupResponse lose location" fixed the scenario you might have encountered.

Is the PR in your distribution or can you provide your worker's distribution commit ID?

@aidar-stripe
Copy link
Author

@FMX thanks for the link! I think you are absolutely right here, we were running a version of Celeborn client (it's been 0.5.1 with some of our commits for integrity checks, which were disabled).

I could confirm that PbGetReducerFileGroupResponse conversion code only takes primaries there:

        val fileGroup = pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
          case (partitionId, fileGroup) =>
            (
              partitionId,
              PbSerDeUtils.fromPbPackedPartitionLocationsPair(
                fileGroup.getPartitionLocationsPair)._1.asScala.toSet.asJava)
        }.asJava

This explains consistency of the failures that we've seen much better than the potential concurrency issue with the HashSet. I would still like to merge in the PR though, I think usage of ConcurrentHashSet still more appropriate there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants