-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail #20561
Conversation
// another is the key prefix. | ||
assert(map.numKeys() * 2 <= map.getArray().size() / 2); | ||
// `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but | ||
// `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible to change UnsafeInMemorySorter to have multiple entries with same key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, but it's not trivial, I'd like to do it later. The required change I can think of: BytesToBytesMap
is actually a key -> list[value], and we need to provide a way to iterate key -> list[value] instead of key -> value.
// empty. Note: each record in the map takes two entries in the point array, one is record | ||
// pointer, another is the key prefix. | ||
if (map.numValues() > map.getArray().size() / 4) { | ||
pointArray = map.allocateArray(map.numValues() * 4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The allocation may fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since overflow may occur (e.g. 0x70000000 * 4), should we use * 4L
instead of * 4
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map.allocateArray
will trigger other consumers to spill if memory is not enough. If the allocation still fails, there is nothing we can do, just let the execution fail.
} | ||
|
||
// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap` | ||
// which has duplicated keys and the number of entries exceeds its capacity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For aggregation, there are no multiple entries for same key, that only happen for hash join (Don't remember the details)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we use BytesToBytesMap
to build the broadcast join hash relation, which may have duplicated keys. I only create a new pointer array if the existing one is not big enough, so we won't have performance regression for aggregate.
Test build #87263 has finished for PR 20561 at commit
|
Test build #87277 has finished for PR 20561 at commit
|
retest this please |
Test build #87279 has finished for PR 20561 at commit
|
retest this please |
Test build #87282 has finished for PR 20561 at commit
|
lgtm |
// to spill, if the memory is not enough. | ||
pointArray = map.allocateArray(map.numValues() * 4L); | ||
} | ||
|
||
// During spilling, the array in map will not be used, so we can borrow that and use it | ||
// as the underlying array for in-memory sorter (it's always large enough). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we update the comment here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test build #87294 has finished for PR 20561 at commit
|
Test build #87300 has finished for PR 20561 at commit
|
Test build #87298 has finished for PR 20561 at commit
|
retest this please. |
Test build #87303 has finished for PR 20561 at commit
|
retest this please. |
Test build #87310 has finished for PR 20561 at commit
|
…ap may fail ## What changes were proposed in this pull request? This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times. When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work. There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array. However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error ``` java.lang.IllegalStateException: There is no space for new record at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149) ... ``` This PR fixes this bug by creating a new point array if the existing one is not big enough. ## How was this patch tested? a new test Author: Wenchen Fan <[email protected]> Closes #20561 from cloud-fan/bug. (cherry picked from commit 4bbd744) Signed-off-by: Wenchen Fan <[email protected]>
thanks, merging to master/2.3/2.2! |
…ap may fail This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times. When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work. There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array. However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error ``` java.lang.IllegalStateException: There is no space for new record at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149) ... ``` This PR fixes this bug by creating a new point array if the existing one is not big enough. a new test Author: Wenchen Fan <[email protected]> Closes #20561 from cloud-fan/bug. (cherry picked from commit 4bbd744) Signed-off-by: Wenchen Fan <[email protected]>
…ap may fail ## What changes were proposed in this pull request? This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times. When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work. There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array. However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error ``` java.lang.IllegalStateException: There is no space for new record at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149) ... ``` This PR fixes this bug by creating a new point array if the existing one is not big enough. ## How was this patch tested? a new test Author: Wenchen Fan <[email protected]> Closes apache#20561 from cloud-fan/bug.
…ap may fail This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times. When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work. There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array. However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error ``` java.lang.IllegalStateException: There is no space for new record at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149) ... ``` This PR fixes this bug by creating a new point array if the existing one is not big enough. a new test Author: Wenchen Fan <[email protected]> Closes apache#20561 from cloud-fan/bug. (cherry picked from commit 4bbd744) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This is a long-standing bug in
UnsafeKVExternalSorter
and was reported in the dev list multiple times.When creating
UnsafeKVExternalSorter
withBytesToBytesMap
, we need to create aUnsafeInMemorySorter
to sort the data inBytesToBytesMap
. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work.There is an optimization in
UnsafeKVExternalSorter
: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of theBytesToBytesMap
point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots).UnsafeInMemorySorter
needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array.However, the number of keys of the map doesn't equal to the number of entries in the map, because
BytesToBytesMap
supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit errorThis PR fixes this bug by creating a new point array if the existing one is not big enough.
How was this patch tested?
a new test