Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail #20561

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.unsafe.sort.*;
Expand Down Expand Up @@ -98,10 +99,22 @@ public UnsafeKVExternalSorter(
numElementsForSpillThreshold,
canUseRadixSort);
} else {
// The array will be used to do in-place sort, which require half of the space to be empty.
// Note: each record in the map takes two entries in the array, one is record pointer,
// another is the key prefix.
assert(map.numKeys() * 2 <= map.getArray().size() / 2);
LongArray pointArray = map.getArray();
// `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but
// `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to change UnsafeInMemorySorter to have multiple entries with same key.

Copy link
Contributor Author

@cloud-fan cloud-fan Feb 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, but it's not trivial, I'd like to do it later. The required change I can think of: BytesToBytesMap is actually a key -> list[value], and we need to provide a way to iterate key -> list[value] instead of key -> value.

// can have duplicated keys, here we need a check to make sure the point array can hold
// all the entries in `BytesToBytesMap`.
// The point array will be used to do in-place sort, which requires half of the space to be
// empty. Note: each record in the map takes two entries in the point array, one is record
// pointer, another is key prefix. So the required size of point array is `numRecords * 4`.
// TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key,
// so that we can always reuse the point array.
if (map.numValues() > pointArray.size() / 4) {
// Here we ask the map to allocate memory, so that the memory manager won't ask the map
// to spill, if the memory is not enough.
pointArray = map.allocateArray(map.numValues() * 4L);
}

// During spilling, the array in map will not be used, so we can borrow that and use it
// as the underlying array for in-memory sorter (it's always large enough).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we update the comment here too?

// Since we will not grow the array, it's fine to pass `null` as consumer.
Expand All @@ -110,7 +123,7 @@ public UnsafeKVExternalSorter(
taskMemoryManager,
comparatorSupplier.get(),
prefixComparator,
map.getArray(),
pointArray,
canUseRadixSort);

// We cannot use the destructive iterator here because we are reusing the existing memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.map.BytesToBytesMap

/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
Expand Down Expand Up @@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
spill = true
)
}

test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") {
val memoryManager = new TestMemoryManager(new SparkConf())
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes())

// Key/value are a unsafe rows with a single int column
val schema = new StructType().add("i", IntegerType)
val key = new UnsafeRow(1)
key.pointTo(new Array[Byte](32), 32)
key.setInt(0, 1)
val value = new UnsafeRow(1)
value.pointTo(new Array[Byte](32), 32)
value.setInt(0, 2)

for (_ <- 1 to 65) {
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
loc.append(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
}

// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
// which has duplicated keys and the number of entries exceeds its capacity.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For aggregation, there are no multiple entries for same key, that only happen for hash join (Don't remember the details)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we use BytesToBytesMap to build the broadcast join hash relation, which may have duplicated keys. I only create a new pointer array if the existing one is not big enough, so we won't have performance regression for aggregate.

try {
TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null))
new UnsafeKVExternalSorter(
schema,
schema,
sparkContext.env.blockManager,
sparkContext.env.serializerManager,
taskMemoryManager.pageSizeBytes(),
Int.MaxValue,
map)
} finally {
TaskContext.unset()
}
}
}