Skip to content

Commit

Permalink
creating UnsafeKVExternalSorter with BytesToBytesMap may fail
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Feb 9, 2018
1 parent 76e019d commit 51d381f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.unsafe.sort.*;
Expand Down Expand Up @@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
numElementsForSpillThreshold,
canUseRadixSort);
} else {
// The array will be used to do in-place sort, which require half of the space to be empty.
// Note: each record in the map takes two entries in the array, one is record pointer,
// another is the key prefix.
assert(map.numKeys() * 2 <= map.getArray().size() / 2);
// `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but
// `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap`
// can have duplicated keys, here we need a check to make sure the point array can hold
// all the entries in `BytesToBytesMap`.
final LongArray pointArray;
// The point array will be used to do in-place sort, which require half of the space to be
// empty. Note: each record in the map takes two entries in the point array, one is record
// pointer, another is the key prefix.
if (map.numValues() > map.getArray().size() / 4) {
pointArray = map.allocateArray(map.numValues() * 4);
} else {
pointArray = map.getArray();
}

// During spilling, the array in map will not be used, so we can borrow that and use it
// as the underlying array for in-memory sorter (it's always large enough).
// Since we will not grow the array, it's fine to pass `null` as consumer.
Expand All @@ -110,7 +121,7 @@ public UnsafeKVExternalSorter(
taskMemoryManager,
comparatorSupplier.get(),
prefixComparator,
map.getArray(),
pointArray,
canUseRadixSort);

// We cannot use the destructive iterator here because we are reusing the existing memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.map.BytesToBytesMap

/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
Expand Down Expand Up @@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
spill = true
)
}

test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") {
val memoryManager = new TestMemoryManager(new SparkConf())
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes())

// Key/value are a unsafe rows with a single int column
val schema = new StructType().add("i", IntegerType)
val key = new UnsafeRow(1)
key.pointTo(new Array[Byte](32), 32)
key.setInt(0, 1)
val value = new UnsafeRow(1)
value.pointTo(new Array[Byte](32), 32)
value.setInt(0, 2)

for (_ <- 1 to 65) {
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
loc.append(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
}

// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
// which has duplicated keys and the number of entries exceeds its capacity.
try {
TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null))
new UnsafeKVExternalSorter(
schema,
schema,
sparkContext.env.blockManager,
sparkContext.env.serializerManager,
taskMemoryManager.pageSizeBytes(),
Int.MaxValue,
map)
} finally {
TaskContext.unset()
}
}
}

0 comments on commit 51d381f

Please sign in to comment.