Skip to content

Commit

Permalink
Make UnsafeExternalSorter's page size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 29, 2015
1 parent 0045aa2 commit ba54d4b
Showing 1 changed file with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ public final class UnsafeExternalSorter {

private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);

private static final int PAGE_SIZE = 1 << 27; // 128 megabytes
@VisibleForTesting
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;

private final long pageSizeBytes;
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
private final int initialSize;
Expand Down Expand Up @@ -91,6 +88,7 @@ public UnsafeExternalSorter(
this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();
}

Expand Down Expand Up @@ -147,9 +145,12 @@ public void spill() throws IOException {
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return sorter.getMemoryUsage() + totalPageSize;
}

@VisibleForTesting
public int getNumberOfAllocatedPages() {
return allocatedPages.size();
Expand Down Expand Up @@ -214,23 +215,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
if (requiredSpace > PAGE_SIZE) {
if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(PAGE_SIZE);
currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = PAGE_SIZE;
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
Expand Down

0 comments on commit ba54d4b

Please sign in to comment.