Skip to content

Commit

Permalink
[SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not …
Browse files Browse the repository at this point in the history
…access released memory page (branch-2.2)

## What changes were proposed in this pull request?

Backport apache#22062 to branch-2.2. Just two minor differences in the test:

- branch-2.2 doesn't have `SparkOutOfMemoryError`. It's using `OutOfMemoryError` directly.
- MockitoSugar is in a different package in old scalatest.

## How was this patch tested?

Jenkins

Closes apache#22072 from zsxwing/SPARK-25081-2.2.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
  • Loading branch information
zsxwing authored and sumwale committed Oct 11, 2021
1 parent 0e13b00 commit e6a5df5
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
*/
private int usableCapacity = 0;

private int initialSize;
private final int initialSize;

ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
this.consumer = consumer;
Expand Down Expand Up @@ -95,12 +95,20 @@ public int numRecords() {
}

public void reset() {
// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
pos = 0;
if (consumer != null) {
consumer.freeArray(array);
// As `array` has been released, we should set it to `null` to avoid accessing it before
// `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing
// data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
// ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access
// `ShuffleInMemorySorter` when `allocateArray` throws OutOfMemoryError).
array = null;
usableCapacity = 0;
array = consumer.allocateArray(initialSize);
usableCapacity = getUsableCapacity();
}
pos = 0;
}

public void expandPointerArray(LongArray newArray) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort

import java.lang.{Long => JLong}

import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar

import org.apache.spark._
import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.memory._
import org.apache.spark.unsafe.Platform

class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {

test("nested spill should be no-op") {
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("ShuffleExternalSorterSuite")
.set("spark.testing", "true")
.set("spark.testing.memory", "1600")
.set("spark.memory.fraction", "1")
sc = new SparkContext(conf)

val memoryManager = UnifiedMemoryManager(conf, 1)

var shouldAllocate = false

// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
// This will trigger a nested spill and expose issues if we don't handle this case properly.
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
// So we leave 400 bytes for the task.
if (shouldAllocate &&
memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
val acquireExecutionMemoryMethod =
memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
acquireExecutionMemoryMethod.invoke(
memoryManager,
JLong.valueOf(
memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
JLong.valueOf(1L), // taskAttemptId
MemoryMode.ON_HEAP
).asInstanceOf[java.lang.Long]
}
super.acquireExecutionMemory(required, consumer)
}
}
val taskContext = mock[TaskContext]
val taskMetrics = new TaskMetrics
when(taskContext.taskMetrics()).thenReturn(taskMetrics)
val sorter = new ShuffleExternalSorter(
taskMemoryManager,
sc.env.blockManager,
taskContext,
100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes
1, // numPartitions
conf,
new ShuffleWriteMetrics)
val inMemSorter = {
val field = sorter.getClass.getDeclaredField("inMemSorter")
field.setAccessible(true)
field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
}
// Allocate memory to make the next "insertRecord" call triggers a spill.
val bytes = new Array[Byte](1)
while (inMemSorter.hasSpaceForAnotherRecord) {
sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
}

// This flag will make the mocked TaskMemoryManager acquire free memory released by spill to
// trigger a nested spill.
shouldAllocate = true

// Should throw `OutOfMemoryError` as there is no enough memory: `ShuffleInMemorySorter`
// will try to acquire 800 bytes but there are only 400 bytes available.
//
// Before the fix, a nested spill may use a released page and this causes two tasks access the
// same memory page. When a task reads memory written by another task, many types of failures
// may happen. Here are some examples we have seen:
//
// - JVM crash. (This is easy to reproduce in the unit test as we fill newly allocated and
// deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory
// address)
// - java.lang.IllegalArgumentException: Comparison method violates its general contract!
// - java.lang.NullPointerException
// at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
// - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912
// because the size after growing exceeds size limitation 2147483632
intercept[OutOfMemoryError] {
sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
}
}
}

0 comments on commit e6a5df5

Please sign in to comment.