diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java index cc9fa649125c2..0a4ab84f76cbe 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java @@ -102,7 +102,7 @@ public static boolean supportsAggregationBufferSchema(StructType schema) { * @param emptyAggregationBuffer the default value for new keys (a "zero" of the agg. function) * @param aggregationBufferSchema the schema of the aggregation buffer, used for row conversion. * @param groupingKeySchema the schema of the grouping key, used for row conversion. - * @param groupingKeySchema the memory manager used to allocate our Unsafe memory structures. + * @param memoryManager the memory manager used to allocate our Unsafe memory structures. * @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing). * @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact) */ @@ -186,7 +186,11 @@ public UnsafeRow getAggregationBuffer(Row groupingKey) { return currentAggregationBuffer; } + /** + * Mutable pair object returned by {@link UnsafeFixedWidthAggregationMap#iterator()}. + */ public static class MapEntry { + private MapEntry() { }; public final UnsafeRow key = new UnsafeRow(); public final UnsafeRow value = new UnsafeRow(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index d2f25fd2e692e..865a790a5875c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -47,21 +47,24 @@ * fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the * base address of the row) that points to the beginning of the variable-length field. * - * Instances of `UnsafeRow` act as pointers to row data stored in this format, similar to how - * `Writable` objects work in Hadoop. + * Instances of `UnsafeRow` act as pointers to row data stored in this format. */ public final class UnsafeRow implements MutableRow { private Object baseObject; private long baseOffset; + /** The number of fields in this row, used for calculating the bitset width (and in assertions) */ private int numFields; + /** The width of the null tracking bit set, in bytes */ private int bitSetWidthInBytes; /** * This optional schema is required if you want to call generic get() and set() methods on * this UnsafeRow, but is optional if callers will only use type-specific getTYPE() and setTYPE() - * methods. + * methods. This should be removed after the planned InternalRow / Row split; right now, it's only + * needed by the generic get() method, which is only called internally by code that accesses + * UTF8String-typed columns. */ @Nullable private StructType schema; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 6bb0a5d32cb52..226e41f9b09f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -291,8 +291,8 @@ case class GeneratedAggregate( aggregationBufferSchema, groupKeySchema, SparkEnv.get.unsafeMemoryManager, - 1024 * 16, - false + 1024 * 16, // initial capacity + false // disable tracking of performance metrics ) while (iter.hasNext) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4c0369f0dbde4..922f5975573ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -142,7 +142,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { partialComputation, planLater(child), unsafeEnabled), - unsafeEnabled) :: Nil + unsafeEnabled) :: Nil // Cases where some aggregate can not be codegened case PartialAggregation( diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 8301c6b9073e8..f464e34e43cd3 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -36,9 +36,9 @@ * This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers, * which is guaranteed to exhaust the space. *

- * Note that even though we use long for indexing, the map can support up to 2^31 keys because - * we use 32 bit MurmurHash. In either case, if the key cardinality is so high, you should probably - * be using sorting instead of hashing for better cache locality. + * The map can support up to 2^31 keys because we use 32 bit MurmurHash. If the key cardinality is + * higher than this, you should probably be using sorting instead of hashing for better cache + * locality. *

* This class is not thread safe. */ @@ -114,6 +114,8 @@ public final class BytesToBytesMap { /** * Mask for truncating hashcodes so that they do not exceed the long array's size. + * This is a strength reduction optimization; we're essentially performing a modulus operation, + * but doing so with a bitmask because this is a power-of-2-sized hash map. */ private int mask; @@ -278,10 +280,14 @@ public final class Location { private void updateAddressesAndSizes(long fullKeyAddress) { final Object page = memoryManager.getPage(fullKeyAddress); final long keyOffsetInPage = memoryManager.getOffsetInPage(fullKeyAddress); - keyMemoryLocation.setObjAndOffset(page, keyOffsetInPage + 8); - keyLength = (int) PlatformDependent.UNSAFE.getLong(page, keyOffsetInPage); - valueMemoryLocation.setObjAndOffset(page, keyOffsetInPage + 8 + keyLength + 8); - valueLength = (int) PlatformDependent.UNSAFE.getLong(page, keyOffsetInPage + 8 + keyLength); + long position = keyOffsetInPage; + keyLength = (int) PlatformDependent.UNSAFE.getLong(page, position); + position += 8; // word used to store the key size + keyMemoryLocation.setObjAndOffset(page, position); + position += keyLength; + valueLength = (int) PlatformDependent.UNSAFE.getLong(page, position); + position += 8; // word used to store the key size + valueMemoryLocation.setObjAndOffset(page, position); } Location with(int pos, int keyHashcode, boolean isDefined) { @@ -377,7 +383,8 @@ public void putNewKey( // Here, we'll copy the data into our data pages. Because we only store a relative offset from // the key address instead of storing the absolute address of the value, the key and value // must be stored in the same memory page. - final long requiredSize = 8 + 8 + keyLengthBytes + valueLengthBytes; + // (8 byte key length) (key) (8 byte value length) (value) + final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes; assert(requiredSize <= PAGE_SIZE_BYTES); size++; bitset.set(pos); @@ -394,11 +401,11 @@ public void putNewKey( final Object pageBaseObject = currentDataPage.getBaseObject(); final long pageBaseOffset = currentDataPage.getBaseOffset(); final long keySizeOffsetInPage = pageBaseOffset + pageCursor; - pageCursor += 8; + pageCursor += 8; // word used to store the key size final long keyDataOffsetInPage = pageBaseOffset + pageCursor; pageCursor += keyLengthBytes; final long valueSizeOffsetInPage = pageBaseOffset + pageCursor; - pageCursor += 8; + pageCursor += 8; // word used to store the value size final long valueDataOffsetInPage = pageBaseOffset + pageCursor; pageCursor += valueLengthBytes; diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java index 49963cc099b29..0beb743e5644e 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java @@ -34,10 +34,6 @@ public class MemoryBlock extends MemoryLocation { */ int pageNumber = -1; - public int getPageNumber() { - return pageNumber; - } - MemoryBlock(@Nullable Object obj, long offset, long length) { super(obj, offset); this.length = length; @@ -58,13 +54,6 @@ public MemoryBlock zero() { return this; } - /** - * Creates a memory block pointing to the memory used by the byte array. - */ - public static MemoryBlock fromByteArray(final byte[] array) { - return new MemoryBlock(array, PlatformDependent.BYTE_ARRAY_OFFSET, array.length); - } - /** * Creates a memory block pointing to the memory used by the long array. */ diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryManager.java index 3b6c8b09f50e8..e3b3da52e19ee 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryManager.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryManager.java @@ -35,13 +35,17 @@ * store a "page number" and the lower 51 bits to store an offset within this page. These page * numbers are used to index into a "page table" array inside of the MemoryManager in order to * retrieve the base object. + *

+ * This allows us to address 8192 pages. In on-heap mode, the maximum page size is limited by the + * maximum size of a long[] array, allowing us to address 8192 * 2^32 * 8 bytes, which is + * approximately 35 terabytes of memory. */ public final class MemoryManager { /** * The number of entries in the page table. */ - private static final int PAGE_TABLE_SIZE = (int) 1L << 13; + private static final int PAGE_TABLE_SIZE = 1 << 13; /** Bit mask for the lower 51 bits of a long. */ private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL; diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/array/TestLongArray.java b/unsafe/src/test/java/org/apache/spark/unsafe/array/TestLongArray.java index e49e344041ad7..53492226a43d5 100644 --- a/unsafe/src/test/java/org/apache/spark/unsafe/array/TestLongArray.java +++ b/unsafe/src/test/java/org/apache/spark/unsafe/array/TestLongArray.java @@ -24,18 +24,13 @@ public class TestLongArray { - private static LongArray createTestData() { - byte[] bytes = new byte[16]; - LongArray arr = new LongArray(MemoryBlock.fromByteArray(bytes)); + @Test + public void basicTest() { + long[] bytes = new long[2]; + LongArray arr = new LongArray(MemoryBlock.fromLongArray(bytes)); arr.set(0, 1L); arr.set(1, 2L); arr.set(1, 3L); - return arr; - } - - @Test - public void basicTest() { - LongArray arr = createTestData(); Assert.assertEquals(2, arr.size()); Assert.assertEquals(1L, arr.get(0)); Assert.assertEquals(3L, arr.get(1));