Skip to content

Commit

Permalink
[SPARK-9411] [SQL] Make Tungsten page sizes configurable
Browse files Browse the repository at this point in the history
We need to make page sizes configurable so we can reduce them in unit tests and increase them in real production workloads.  These sizes are now controlled by a new configuration, `spark.buffer.pageSize`.  The new default is 64 megabytes.

Author: Josh Rosen <[email protected]>

Closes #7741 from JoshRosen/SPARK-9411 and squashes the following commits:

a43c4db [Josh Rosen] Fix pow
2c0eefc [Josh Rosen] Fix MAXIMUM_PAGE_SIZE_BYTES comment + value
bccfb51 [Josh Rosen] Lower page size to 4MB in TestHive
ba54d4b [Josh Rosen] Make UnsafeExternalSorter's page size configurable
0045aa2 [Josh Rosen] Make UnsafeShuffle's page size configurable
bc734f0 [Josh Rosen] Rename configuration
e614858 [Josh Rosen] Makes BytesToBytesMap page size configurable
  • Loading branch information
JoshRosen authored and rxin committed Jul 29, 2015
1 parent b715933 commit 1b0099f
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ final class UnsafeShuffleExternalSorter {

private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);

private static final int PAGE_SIZE = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
@VisibleForTesting
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
@VisibleForTesting
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;

private final int initialSize;
private final int numPartitions;
private final int pageSizeBytes;
@VisibleForTesting
final int maxRecordSizeBytes;
private final TaskMemoryManager memoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
Expand Down Expand Up @@ -109,7 +109,10 @@ public UnsafeShuffleExternalSorter(
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;

this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
}
Expand Down Expand Up @@ -272,7 +275,11 @@ void spill() throws IOException {
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return sorter.getMemoryUsage() + totalPageSize;
}

private long freeMemory() {
Expand Down Expand Up @@ -346,23 +353,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
if (requiredSpace > PAGE_SIZE) {
if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(PAGE_SIZE);
currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = PAGE_SIZE;
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public UnsafeShuffleWriter(
open();
}

@VisibleForTesting
public int maxRecordSizeBytes() {
return sorter.maxRecordSizeBytes;
}

/**
* This convenience method should only be called in test code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ public final class UnsafeExternalSorter {

private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);

private static final int PAGE_SIZE = 1 << 27; // 128 megabytes
@VisibleForTesting
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;

private final long pageSizeBytes;
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
private final int initialSize;
Expand Down Expand Up @@ -91,6 +88,7 @@ public UnsafeExternalSorter(
this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();
}

Expand Down Expand Up @@ -147,7 +145,11 @@ public void spill() throws IOException {
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return sorter.getMemoryUsage() + totalPageSize;
}

@VisibleForTesting
Expand Down Expand Up @@ -214,23 +216,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
if (requiredSpace > PAGE_SIZE) {
if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(PAGE_SIZE);
currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = PAGE_SIZE;
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void setUp() throws IOException {
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
partitionSizesInMergedFile = null;
spillFilesCreated.clear();
conf = new SparkConf();
conf = new SparkConf().set("spark.buffer.pageSize", "128m");
taskMetrics = new TaskMetrics();

when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
Expand Down Expand Up @@ -512,12 +512,12 @@ public void close() { }
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(new byte[1], new byte[1]));
writer.forceSorterToSpill();
// We should be able to write a record that's right _at_ the max record size
final byte[] atMaxRecordSize = new byte[UnsafeShuffleExternalSorter.MAX_RECORD_SIZE];
final byte[] atMaxRecordSize = new byte[writer.maxRecordSizeBytes()];
new Random(42).nextBytes(atMaxRecordSize);
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(new byte[0], atMaxRecordSize));
writer.forceSorterToSpill();
// Inserting a record that's larger than the max record size should fail:
final byte[] exceedsMaxRecordSize = new byte[UnsafeShuffleExternalSorter.MAX_RECORD_SIZE + 1];
final byte[] exceedsMaxRecordSize = new byte[writer.maxRecordSizeBytes() + 1];
new Random(42).nextBytes(exceedsMaxRecordSize);
Product2<Object, Object> hugeRecord =
new Tuple2<Object, Object>(new byte[0], exceedsMaxRecordSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public static boolean supportsAggregationBufferSchema(StructType schema) {
* @param groupingKeySchema the schema of the grouping key, used for row conversion.
* @param memoryManager the memory manager used to allocate our Unsafe memory structures.
* @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
* @param pageSizeBytes the data page size, in bytes; limits the maximum record size.
* @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
*/
public UnsafeFixedWidthAggregationMap(
Expand All @@ -103,11 +104,13 @@ public UnsafeFixedWidthAggregationMap(
StructType groupingKeySchema,
TaskMemoryManager memoryManager,
int initialCapacity,
long pageSizeBytes,
boolean enablePerfMetrics) {
this.aggregationBufferSchema = aggregationBufferSchema;
this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
this.groupingKeySchema = groupingKeySchema;
this.map = new BytesToBytesMap(memoryManager, initialCapacity, enablePerfMetrics);
this.map =
new BytesToBytesMap(memoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics);
this.enablePerfMetrics = enablePerfMetrics;

// Initialize the buffer for aggregation value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class UnsafeFixedWidthAggregationMapSuite
private val groupKeySchema = StructType(StructField("product", StringType) :: Nil)
private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil)
private def emptyAggregationBuffer: InternalRow = InternalRow(0)
private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes

private var memoryManager: TaskMemoryManager = null

Expand Down Expand Up @@ -69,7 +70,8 @@ class UnsafeFixedWidthAggregationMapSuite
aggBufferSchema,
groupKeySchema,
memoryManager,
1024, // initial capacity
1024, // initial capacity,
PAGE_SIZE_BYTES,
false // disable perf metrics
)
assert(!map.iterator().hasNext)
Expand All @@ -83,6 +85,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
memoryManager,
1024, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
)
val groupKey = InternalRow(UTF8String.fromString("cats"))
Expand All @@ -109,6 +112,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
memoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
)
val rand = new Random(42)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.TaskContext
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -260,12 +260,14 @@ case class GeneratedAggregate(
} else if (unsafeEnabled && schemaSupportsUnsafe) {
assert(iter.hasNext, "There should be at least one row for this path")
log.info("Using Unsafe-based aggregator")
val pageSizeBytes = SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
val aggregationMap = new UnsafeFixedWidthAggregationMap(
newAggregationBuffer(EmptyRow),
aggregationBufferSchema,
groupKeySchema,
TaskContext.get.taskMemoryManager(),
1024 * 16, // initial capacity
pageSizeBytes,
false // disable tracking of performance metrics
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.nio.ByteOrder
import java.util.{HashMap => JavaHashMap}

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer
Expand Down Expand Up @@ -259,7 +260,11 @@ private[joins] final class UnsafeHashedRelation(
val nKeys = in.readInt()
// This is used in Broadcast, shared by multiple tasks, so we use on-heap memory
val memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
binaryMap = new BytesToBytesMap(memoryManager, nKeys * 2) // reduce hash collision
val pageSizeBytes = SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
binaryMap = new BytesToBytesMap(
memoryManager,
nKeys * 2, // reduce hash collision
pageSizeBytes)

var i = 0
var keyBuffer = new Array[Byte](1024)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ object TestHive
.set("spark.sql.test", "")
.set("spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")
.set("spark.buffer.pageSize", "4m")
// SPARK-8910
.set("spark.ui.enabled", "false")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ public final class BytesToBytesMap {
*/
private long pageCursor = 0;

/**
* The size of the data pages that hold key and value data. Map entries cannot span multiple
* pages, so this limits the maximum entry size.
*/
private static final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes

/**
* The maximum number of keys that BytesToBytesMap supports. The hash table has to be
* power-of-2-sized and its backing Java array can contain at most (1 << 30) elements, since
Expand Down Expand Up @@ -117,6 +111,12 @@ public final class BytesToBytesMap {

private final double loadFactor;

/**
* The size of the data pages that hold key and value data. Map entries cannot span multiple
* pages, so this limits the maximum entry size.
*/
private final long pageSizeBytes;

/**
* Number of keys defined in the map.
*/
Expand Down Expand Up @@ -153,10 +153,12 @@ public BytesToBytesMap(
TaskMemoryManager memoryManager,
int initialCapacity,
double loadFactor,
long pageSizeBytes,
boolean enablePerfMetrics) {
this.memoryManager = memoryManager;
this.loadFactor = loadFactor;
this.loc = new Location();
this.pageSizeBytes = pageSizeBytes;
this.enablePerfMetrics = enablePerfMetrics;
if (initialCapacity <= 0) {
throw new IllegalArgumentException("Initial capacity must be greater than 0");
Expand All @@ -165,18 +167,26 @@ public BytesToBytesMap(
throw new IllegalArgumentException(
"Initial capacity " + initialCapacity + " exceeds maximum capacity of " + MAX_CAPACITY);
}
if (pageSizeBytes > TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " +
TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
}
allocate(initialCapacity);
}

public BytesToBytesMap(TaskMemoryManager memoryManager, int initialCapacity) {
this(memoryManager, initialCapacity, 0.70, false);
public BytesToBytesMap(
TaskMemoryManager memoryManager,
int initialCapacity,
long pageSizeBytes) {
this(memoryManager, initialCapacity, 0.70, pageSizeBytes, false);
}

public BytesToBytesMap(
TaskMemoryManager memoryManager,
int initialCapacity,
long pageSizeBytes,
boolean enablePerfMetrics) {
this(memoryManager, initialCapacity, 0.70, enablePerfMetrics);
this(memoryManager, initialCapacity, 0.70, pageSizeBytes, enablePerfMetrics);
}

/**
Expand Down Expand Up @@ -443,20 +453,20 @@ public void putNewKey(
// must be stored in the same memory page.
// (8 byte key length) (key) (8 byte value length) (value)
final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes;
assert (requiredSize <= PAGE_SIZE_BYTES - 8); // Reserve 8 bytes for the end-of-page marker.
assert (requiredSize <= pageSizeBytes - 8); // Reserve 8 bytes for the end-of-page marker.
size++;
bitset.set(pos);

// If there's not enough space in the current page, allocate a new page (8 bytes are reserved
// for the end-of-page marker).
if (currentDataPage == null || PAGE_SIZE_BYTES - 8 - pageCursor < requiredSize) {
if (currentDataPage == null || pageSizeBytes - 8 - pageCursor < requiredSize) {
if (currentDataPage != null) {
// There wasn't enough space in the current page, so write an end-of-page marker:
final Object pageBaseObject = currentDataPage.getBaseObject();
final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor;
PlatformDependent.UNSAFE.putLong(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
}
MemoryBlock newPage = memoryManager.allocatePage(PAGE_SIZE_BYTES);
MemoryBlock newPage = memoryManager.allocatePage(pageSizeBytes);
dataPages.add(newPage);
pageCursor = 0;
currentDataPage = newPage;
Expand Down Expand Up @@ -538,10 +548,11 @@ public void free() {

/** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */
public long getTotalMemoryConsumption() {
return (
dataPages.size() * PAGE_SIZE_BYTES +
bitset.memoryBlock().size() +
longArray.memoryBlock().size());
long totalDataPagesSize = 0L;
for (MemoryBlock dataPage : dataPages) {
totalDataPagesSize += dataPage.size();
}
return totalDataPagesSize + bitset.memoryBlock().size() + longArray.memoryBlock().size();
}

/**
Expand Down
Loading

0 comments on commit 1b0099f

Please sign in to comment.