From 51d381f1f0889fe95db0ff375949030aba271e89 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 9 Feb 2018 23:00:14 +0800 Subject: [PATCH 1/4] creating UnsafeKVExternalSorter with BytesToBytesMap may fail --- .../sql/execution/UnsafeKVExternalSorter.java | 21 +++++++--- .../UnsafeKVExternalSorterSuite.scala | 39 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index b0b5383a081a0..147fe3636456a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -34,6 +34,7 @@ import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.KVIterator; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.map.BytesToBytesMap; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.unsafe.sort.*; @@ -98,10 +99,20 @@ public UnsafeKVExternalSorter( numElementsForSpillThreshold, canUseRadixSort); } else { - // The array will be used to do in-place sort, which require half of the space to be empty. - // Note: each record in the map takes two entries in the array, one is record pointer, - // another is the key prefix. - assert(map.numKeys() * 2 <= map.getArray().size() / 2); + // `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but + // `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` + // can have duplicated keys, here we need a check to make sure the point array can hold + // all the entries in `BytesToBytesMap`. + final LongArray pointArray; + // The point array will be used to do in-place sort, which require half of the space to be + // empty. Note: each record in the map takes two entries in the point array, one is record + // pointer, another is the key prefix. + if (map.numValues() > map.getArray().size() / 4) { + pointArray = map.allocateArray(map.numValues() * 4); + } else { + pointArray = map.getArray(); + } + // During spilling, the array in map will not be used, so we can borrow that and use it // as the underlying array for in-memory sorter (it's always large enough). // Since we will not grow the array, it's fine to pass `null` as consumer. @@ -110,7 +121,7 @@ public UnsafeKVExternalSorter( taskMemoryManager, comparatorSupplier.get(), prefixComparator, - map.getArray(), + pointArray, canUseRadixSort); // We cannot use the destructive iterator here because we are reusing the existing memory diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 6af9f8b77f8d3..bf588d3bb7841 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.map.BytesToBytesMap /** * Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data. @@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { spill = true ) } + + test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") { + val memoryManager = new TestMemoryManager(new SparkConf()) + val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) + val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes()) + + // Key/value are a unsafe rows with a single int column + val schema = new StructType().add("i", IntegerType) + val key = new UnsafeRow(1) + key.pointTo(new Array[Byte](32), 32) + key.setInt(0, 1) + val value = new UnsafeRow(1) + value.pointTo(new Array[Byte](32), 32) + value.setInt(0, 2) + + for (_ <- 1 to 65) { + val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes) + loc.append( + key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, + value.getBaseObject, value.getBaseOffset, value.getSizeInBytes) + } + + // Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap` + // which has duplicated keys and the number of entries exceeds its capacity. + try { + TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null)) + new UnsafeKVExternalSorter( + schema, + schema, + sparkContext.env.blockManager, + sparkContext.env.serializerManager, + taskMemoryManager.pageSizeBytes(), + Int.MaxValue, + map) + } finally { + TaskContext.unset() + } + } } From 8dab79a46d9201ec0e43e60d6ef841bf91f4c616 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 10 Feb 2018 11:16:38 +0800 Subject: [PATCH 2/4] address comments --- .../apache/spark/sql/execution/UnsafeKVExternalSorter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 147fe3636456a..7e44caa013c4f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -107,8 +107,10 @@ public UnsafeKVExternalSorter( // The point array will be used to do in-place sort, which require half of the space to be // empty. Note: each record in the map takes two entries in the point array, one is record // pointer, another is the key prefix. + // TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key, + // so that we can always reuse the point array. if (map.numValues() > map.getArray().size() / 4) { - pointArray = map.allocateArray(map.numValues() * 4); + pointArray = map.allocateArray(map.numValues() * 4L); } else { pointArray = map.getArray(); } From 151a92dff074bff26ad179bedbdd4b49f345ec93 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 11 Feb 2018 11:48:18 +0800 Subject: [PATCH 3/4] more comments --- .../spark/sql/execution/UnsafeKVExternalSorter.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 7e44caa013c4f..1ec3209e8e2be 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -99,20 +99,20 @@ public UnsafeKVExternalSorter( numElementsForSpillThreshold, canUseRadixSort); } else { + LongArray pointArray = map.getArray(); // `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but // `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` // can have duplicated keys, here we need a check to make sure the point array can hold // all the entries in `BytesToBytesMap`. - final LongArray pointArray; - // The point array will be used to do in-place sort, which require half of the space to be + // The point array will be used to do in-place sort, which requires half of the space to be // empty. Note: each record in the map takes two entries in the point array, one is record - // pointer, another is the key prefix. + // pointer, another is key prefix. So the required size of point array is `numRecords * 4`. // TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key, // so that we can always reuse the point array. - if (map.numValues() > map.getArray().size() / 4) { + if (map.numValues() > pointArray.size() / 4) { + // Here we ask the map to allocate memory, so that the memory manager won't ask the map + // to spill, if the memory is not enough. pointArray = map.allocateArray(map.numValues() * 4L); - } else { - pointArray = map.getArray(); } // During spilling, the array in map will not be used, so we can borrow that and use it From 2e7a5ad9063d51116c1180b1c8285631edb8ce65 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 11 Feb 2018 14:07:24 +0800 Subject: [PATCH 4/4] address comment --- .../sql/execution/UnsafeKVExternalSorter.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 1ec3209e8e2be..9eb03430a7db2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -99,31 +99,33 @@ public UnsafeKVExternalSorter( numElementsForSpillThreshold, canUseRadixSort); } else { - LongArray pointArray = map.getArray(); - // `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but - // `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` - // can have duplicated keys, here we need a check to make sure the point array can hold - // all the entries in `BytesToBytesMap`. - // The point array will be used to do in-place sort, which requires half of the space to be - // empty. Note: each record in the map takes two entries in the point array, one is record - // pointer, another is key prefix. So the required size of point array is `numRecords * 4`. + // During spilling, the pointer array in `BytesToBytesMap` will not be used, so we can borrow + // that and use it as the pointer array for `UnsafeInMemorySorter`. + LongArray pointerArray = map.getArray(); + // `BytesToBytesMap`'s pointer array is only guaranteed to hold all the distinct keys, but + // `UnsafeInMemorySorter`'s pointer array need to hold all the entries. Since + // `BytesToBytesMap` can have duplicated keys, here we need a check to make sure the pointer + // array can hold all the entries in `BytesToBytesMap`. + // The pointer array will be used to do in-place sort, which requires half of the space to be + // empty. Note: each record in the map takes two entries in the pointer array, one is record + // pointer, another is key prefix. So the required size of pointer array is `numRecords * 4`. // TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key, - // so that we can always reuse the point array. - if (map.numValues() > pointArray.size() / 4) { + // so that we can always reuse the pointer array. + if (map.numValues() > pointerArray.size() / 4) { // Here we ask the map to allocate memory, so that the memory manager won't ask the map // to spill, if the memory is not enough. - pointArray = map.allocateArray(map.numValues() * 4L); + pointerArray = map.allocateArray(map.numValues() * 4L); } - // During spilling, the array in map will not be used, so we can borrow that and use it - // as the underlying array for in-memory sorter (it's always large enough). - // Since we will not grow the array, it's fine to pass `null` as consumer. + // Since the pointer array(either reuse the one in the map, or create a new one) is guaranteed + // to be large enough, it's fine to pass `null` as consumer because we won't allocate more + // memory. final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter( null, taskMemoryManager, comparatorSupplier.get(), prefixComparator, - pointArray, + pointerArray, canUseRadixSort); // We cannot use the destructive iterator here because we are reusing the existing memory