Skip to content

Commit

Permalink
More comments, formatting, and code cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Apr 28, 2015
1 parent 529e571 commit ce3c565
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static boolean supportsAggregationBufferSchema(StructType schema) {
* @param emptyAggregationBuffer the default value for new keys (a "zero" of the agg. function)
* @param aggregationBufferSchema the schema of the aggregation buffer, used for row conversion.
* @param groupingKeySchema the schema of the grouping key, used for row conversion.
* @param groupingKeySchema the memory manager used to allocate our Unsafe memory structures.
* @param memoryManager the memory manager used to allocate our Unsafe memory structures.
* @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
* @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
*/
Expand Down Expand Up @@ -186,7 +186,11 @@ public UnsafeRow getAggregationBuffer(Row groupingKey) {
return currentAggregationBuffer;
}

/**
* Mutable pair object returned by {@link UnsafeFixedWidthAggregationMap#iterator()}.
*/
public static class MapEntry {
private MapEntry() { };
public final UnsafeRow key = new UnsafeRow();
public final UnsafeRow value = new UnsafeRow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,24 @@
* fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the
* base address of the row) that points to the beginning of the variable-length field.
*
* Instances of `UnsafeRow` act as pointers to row data stored in this format, similar to how
* `Writable` objects work in Hadoop.
* Instances of `UnsafeRow` act as pointers to row data stored in this format.
*/
public final class UnsafeRow implements MutableRow {

private Object baseObject;
private long baseOffset;

/** The number of fields in this row, used for calculating the bitset width (and in assertions) */
private int numFields;

/** The width of the null tracking bit set, in bytes */
private int bitSetWidthInBytes;
/**
* This optional schema is required if you want to call generic get() and set() methods on
* this UnsafeRow, but is optional if callers will only use type-specific getTYPE() and setTYPE()
* methods.
* methods. This should be removed after the planned InternalRow / Row split; right now, it's only
* needed by the generic get() method, which is only called internally by code that accesses
* UTF8String-typed columns.
*/
@Nullable
private StructType schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ case class GeneratedAggregate(
aggregationBufferSchema,
groupKeySchema,
SparkEnv.get.unsafeMemoryManager,
1024 * 16,
false
1024 * 16, // initial capacity
false // disable tracking of performance metrics
)

while (iter.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
partialComputation,
planLater(child),
unsafeEnabled),
unsafeEnabled) :: Nil
unsafeEnabled) :: Nil

// Cases where some aggregate can not be codegened
case PartialAggregation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
* This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
* which is guaranteed to exhaust the space.
* <p>
* Note that even though we use long for indexing, the map can support up to 2^31 keys because
* we use 32 bit MurmurHash. In either case, if the key cardinality is so high, you should probably
* be using sorting instead of hashing for better cache locality.
* The map can support up to 2^31 keys because we use 32 bit MurmurHash. If the key cardinality is
* higher than this, you should probably be using sorting instead of hashing for better cache
* locality.
* <p>
* This class is not thread safe.
*/
Expand Down Expand Up @@ -114,6 +114,8 @@ public final class BytesToBytesMap {

/**
* Mask for truncating hashcodes so that they do not exceed the long array's size.
* This is a strength reduction optimization; we're essentially performing a modulus operation,
* but doing so with a bitmask because this is a power-of-2-sized hash map.
*/
private int mask;

Expand Down Expand Up @@ -278,10 +280,14 @@ public final class Location {
private void updateAddressesAndSizes(long fullKeyAddress) {
final Object page = memoryManager.getPage(fullKeyAddress);
final long keyOffsetInPage = memoryManager.getOffsetInPage(fullKeyAddress);
keyMemoryLocation.setObjAndOffset(page, keyOffsetInPage + 8);
keyLength = (int) PlatformDependent.UNSAFE.getLong(page, keyOffsetInPage);
valueMemoryLocation.setObjAndOffset(page, keyOffsetInPage + 8 + keyLength + 8);
valueLength = (int) PlatformDependent.UNSAFE.getLong(page, keyOffsetInPage + 8 + keyLength);
long position = keyOffsetInPage;
keyLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
position += 8; // word used to store the key size
keyMemoryLocation.setObjAndOffset(page, position);
position += keyLength;
valueLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
position += 8; // word used to store the key size
valueMemoryLocation.setObjAndOffset(page, position);
}

Location with(int pos, int keyHashcode, boolean isDefined) {
Expand Down Expand Up @@ -377,7 +383,8 @@ public void putNewKey(
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
final long requiredSize = 8 + 8 + keyLengthBytes + valueLengthBytes;
// (8 byte key length) (key) (8 byte value length) (value)
final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes;
assert(requiredSize <= PAGE_SIZE_BYTES);
size++;
bitset.set(pos);
Expand All @@ -394,11 +401,11 @@ public void putNewKey(
final Object pageBaseObject = currentDataPage.getBaseObject();
final long pageBaseOffset = currentDataPage.getBaseOffset();
final long keySizeOffsetInPage = pageBaseOffset + pageCursor;
pageCursor += 8;
pageCursor += 8; // word used to store the key size
final long keyDataOffsetInPage = pageBaseOffset + pageCursor;
pageCursor += keyLengthBytes;
final long valueSizeOffsetInPage = pageBaseOffset + pageCursor;
pageCursor += 8;
pageCursor += 8; // word used to store the value size
final long valueDataOffsetInPage = pageBaseOffset + pageCursor;
pageCursor += valueLengthBytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ public class MemoryBlock extends MemoryLocation {
*/
int pageNumber = -1;

public int getPageNumber() {
return pageNumber;
}

MemoryBlock(@Nullable Object obj, long offset, long length) {
super(obj, offset);
this.length = length;
Expand All @@ -58,13 +54,6 @@ public MemoryBlock zero() {
return this;
}

/**
* Creates a memory block pointing to the memory used by the byte array.
*/
public static MemoryBlock fromByteArray(final byte[] array) {
return new MemoryBlock(array, PlatformDependent.BYTE_ARRAY_OFFSET, array.length);
}

/**
* Creates a memory block pointing to the memory used by the long array.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@
* store a "page number" and the lower 51 bits to store an offset within this page. These page
* numbers are used to index into a "page table" array inside of the MemoryManager in order to
* retrieve the base object.
* <p>
* This allows us to address 8192 pages. In on-heap mode, the maximum page size is limited by the
* maximum size of a long[] array, allowing us to address 8192 * 2^32 * 8 bytes, which is
* approximately 35 terabytes of memory.
*/
public final class MemoryManager {

/**
* The number of entries in the page table.
*/
private static final int PAGE_TABLE_SIZE = (int) 1L << 13;
private static final int PAGE_TABLE_SIZE = 1 << 13;

/** Bit mask for the lower 51 bits of a long. */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,13 @@

public class TestLongArray {

private static LongArray createTestData() {
byte[] bytes = new byte[16];
LongArray arr = new LongArray(MemoryBlock.fromByteArray(bytes));
@Test
public void basicTest() {
long[] bytes = new long[2];
LongArray arr = new LongArray(MemoryBlock.fromLongArray(bytes));
arr.set(0, 1L);
arr.set(1, 2L);
arr.set(1, 3L);
return arr;
}

@Test
public void basicTest() {
LongArray arr = createTestData();
Assert.assertEquals(2, arr.size());
Assert.assertEquals(1L, arr.get(0));
Assert.assertEquals(3L, arr.get(1));
Expand Down

0 comments on commit ce3c565

Please sign in to comment.