Skip to content

Commit

Permalink
[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesM…
Browse files Browse the repository at this point in the history
…ap may fail

## What changes were proposed in this pull request?

This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times.

When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work.

There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array.

However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error
```
java.lang.IllegalStateException: There is no space for new record
   at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149)
...
```

This PR fixes this bug by creating a new point array if the existing one is not big enough.

## How was this patch tested?

a new test

Author: Wenchen Fan <[email protected]>

Closes apache#20561 from cloud-fan/bug.
  • Loading branch information
cloud-fan authored and Robert Kruszewski committed Feb 12, 2018
1 parent 29c970a commit 51c623d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.unsafe.sort.*;
Expand Down Expand Up @@ -98,19 +99,33 @@ public UnsafeKVExternalSorter(
numElementsForSpillThreshold,
canUseRadixSort);
} else {
// The array will be used to do in-place sort, which require half of the space to be empty.
// Note: each record in the map takes two entries in the array, one is record pointer,
// another is the key prefix.
assert(map.numKeys() * 2 <= map.getArray().size() / 2);
// During spilling, the array in map will not be used, so we can borrow that and use it
// as the underlying array for in-memory sorter (it's always large enough).
// Since we will not grow the array, it's fine to pass `null` as consumer.
// During spilling, the pointer array in `BytesToBytesMap` will not be used, so we can borrow
// that and use it as the pointer array for `UnsafeInMemorySorter`.
LongArray pointerArray = map.getArray();
// `BytesToBytesMap`'s pointer array is only guaranteed to hold all the distinct keys, but
// `UnsafeInMemorySorter`'s pointer array need to hold all the entries. Since
// `BytesToBytesMap` can have duplicated keys, here we need a check to make sure the pointer
// array can hold all the entries in `BytesToBytesMap`.
// The pointer array will be used to do in-place sort, which requires half of the space to be
// empty. Note: each record in the map takes two entries in the pointer array, one is record
// pointer, another is key prefix. So the required size of pointer array is `numRecords * 4`.
// TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key,
// so that we can always reuse the pointer array.
if (map.numValues() > pointerArray.size() / 4) {
// Here we ask the map to allocate memory, so that the memory manager won't ask the map
// to spill, if the memory is not enough.
pointerArray = map.allocateArray(map.numValues() * 4L);
}

// Since the pointer array(either reuse the one in the map, or create a new one) is guaranteed
// to be large enough, it's fine to pass `null` as consumer because we won't allocate more
// memory.
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
null,
taskMemoryManager,
comparatorSupplier.get(),
prefixComparator,
map.getArray(),
pointerArray,
canUseRadixSort);

// We cannot use the destructive iterator here because we are reusing the existing memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.map.BytesToBytesMap

/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
Expand Down Expand Up @@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
spill = true
)
}

test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") {
val memoryManager = new TestMemoryManager(new SparkConf())
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes())

// Key/value are a unsafe rows with a single int column
val schema = new StructType().add("i", IntegerType)
val key = new UnsafeRow(1)
key.pointTo(new Array[Byte](32), 32)
key.setInt(0, 1)
val value = new UnsafeRow(1)
value.pointTo(new Array[Byte](32), 32)
value.setInt(0, 2)

for (_ <- 1 to 65) {
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
loc.append(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
}

// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
// which has duplicated keys and the number of entries exceeds its capacity.
try {
TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null))
new UnsafeKVExternalSorter(
schema,
schema,
sparkContext.env.blockManager,
sparkContext.env.serializerManager,
taskMemoryManager.pageSizeBytes(),
Int.MaxValue,
map)
} finally {
TaskContext.unset()
}
}
}

0 comments on commit 51c623d

Please sign in to comment.