Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail #20561

Closed
wants to merge 4 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This is a long-standing bug in UnsafeKVExternalSorter and was reported in the dev list multiple times.

When creating UnsafeKVExternalSorter with BytesToBytesMap, we need to create a UnsafeInMemorySorter to sort the data in BytesToBytesMap. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work.

There is an optimization in UnsafeKVExternalSorter: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the BytesToBytesMap point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). UnsafeInMemorySorter needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array.

However, the number of keys of the map doesn't equal to the number of entries in the map, because BytesToBytesMap supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error

java.lang.IllegalStateException: There is no space for new record
   at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149)
...

This PR fixes this bug by creating a new point array if the existing one is not big enough.

How was this patch tested?

a new test

@cloud-fan
Copy link
Contributor Author

// another is the key prefix.
assert(map.numKeys() * 2 <= map.getArray().size() / 2);
// `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but
// `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to change UnsafeInMemorySorter to have multiple entries with same key.

Copy link
Contributor Author

@cloud-fan cloud-fan Feb 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, but it's not trivial, I'd like to do it later. The required change I can think of: BytesToBytesMap is actually a key -> list[value], and we need to provide a way to iterate key -> list[value] instead of key -> value.

// empty. Note: each record in the map takes two entries in the point array, one is record
// pointer, another is the key prefix.
if (map.numValues() > map.getArray().size() / 4) {
pointArray = map.allocateArray(map.numValues() * 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocation may fail.

Copy link
Member

@kiszk kiszk Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since overflow may occur (e.g. 0x70000000 * 4), should we use * 4L instead of * 4?

Copy link
Contributor Author

@cloud-fan cloud-fan Feb 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map.allocateArray will trigger other consumers to spill if memory is not enough. If the allocation still fails, there is nothing we can do, just let the execution fail.

}

// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
// which has duplicated keys and the number of entries exceeds its capacity.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For aggregation, there are no multiple entries for same key, that only happen for hash join (Don't remember the details)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we use BytesToBytesMap to build the broadcast join hash relation, which may have duplicated keys. I only create a new pointer array if the existing one is not big enough, so we won't have performance regression for aggregate.

@SparkQA
Copy link

SparkQA commented Feb 9, 2018

Test build #87263 has finished for PR 20561 at commit 51d381f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 10, 2018

Test build #87277 has finished for PR 20561 at commit 8dab79a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Feb 10, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Feb 10, 2018

Test build #87279 has finished for PR 20561 at commit 8dab79a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Feb 10, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Feb 10, 2018

Test build #87282 has finished for PR 20561 at commit 8dab79a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Feb 10, 2018

lgtm

// to spill, if the memory is not enough.
pointArray = map.allocateArray(map.numValues() * 4L);
}

// During spilling, the array in map will not be used, so we can borrow that and use it
// as the underlying array for in-memory sorter (it's always large enough).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we update the comment here too?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Feb 11, 2018

Test build #87294 has finished for PR 20561 at commit 151a92d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2018

Test build #87300 has finished for PR 20561 at commit 2e7a5ad.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2018

Test build #87298 has finished for PR 20561 at commit 5e93313.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Feb 11, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 11, 2018

Test build #87303 has finished for PR 20561 at commit 2e7a5ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Feb 11, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 11, 2018

Test build #87310 has finished for PR 20561 at commit 2e7a5ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Feb 11, 2018
…ap may fail

## What changes were proposed in this pull request?

This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times.

When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work.

There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array.

However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error
```
java.lang.IllegalStateException: There is no space for new record
   at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149)
...
```

This PR fixes this bug by creating a new point array if the existing one is not big enough.

## How was this patch tested?

a new test

Author: Wenchen Fan <[email protected]>

Closes #20561 from cloud-fan/bug.

(cherry picked from commit 4bbd744)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in 4bbd744 Feb 11, 2018
@cloud-fan
Copy link
Contributor Author

thanks, merging to master/2.3/2.2!

asfgit pushed a commit that referenced this pull request Feb 11, 2018
…ap may fail

This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times.

When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work.

There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array.

However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error
```
java.lang.IllegalStateException: There is no space for new record
   at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149)
...
```

This PR fixes this bug by creating a new point array if the existing one is not big enough.

a new test

Author: Wenchen Fan <[email protected]>

Closes #20561 from cloud-fan/bug.

(cherry picked from commit 4bbd744)
Signed-off-by: Wenchen Fan <[email protected]>
robert3005 pushed a commit to palantir/spark that referenced this pull request Feb 12, 2018
…ap may fail

## What changes were proposed in this pull request?

This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times.

When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work.

There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array.

However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error
```
java.lang.IllegalStateException: There is no space for new record
   at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149)
...
```

This PR fixes this bug by creating a new point array if the existing one is not big enough.

## How was this patch tested?

a new test

Author: Wenchen Fan <[email protected]>

Closes apache#20561 from cloud-fan/bug.
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…ap may fail

This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times.

When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work.

There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array.

However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error
```
java.lang.IllegalStateException: There is no space for new record
   at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149)
...
```

This PR fixes this bug by creating a new point array if the existing one is not big enough.

a new test

Author: Wenchen Fan <[email protected]>

Closes apache#20561 from cloud-fan/bug.

(cherry picked from commit 4bbd744)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants