Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 14, 2015
2 parents 05c2676 + bce00da commit bf0ab6e
Show file tree
Hide file tree
Showing 79 changed files with 3,734 additions and 422 deletions.
10 changes: 10 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,16 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import scala.reflect.ClassTag;

import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.unsafe.PlatformDependent;

/**
* Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
* Our shuffle write path doesn't actually use this serializer (since we end up calling the
* `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
* around this, we pass a dummy no-op serializer.
*/
final class DummySerializerInstance extends SerializerInstance {

public static final DummySerializerInstance INSTANCE = new DummySerializerInstance();

private DummySerializerInstance() { }

@Override
public SerializationStream serializeStream(final OutputStream s) {
return new SerializationStream() {
@Override
public void flush() {
// Need to implement this because DiskObjectWriter uses it to flush the compression stream
try {
s.flush();
} catch (IOException e) {
PlatformDependent.throwException(e);
}
}

@Override
public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) {
throw new UnsupportedOperationException();
}

@Override
public void close() {
// Need to implement this because DiskObjectWriter uses it to close the compression stream
try {
s.close();
} catch (IOException e) {
PlatformDependent.throwException(e);
}
}
};
}

@Override
public <T> ByteBuffer serialize(T t, ClassTag<T> ev1) {
throw new UnsupportedOperationException();
}

@Override
public DeserializationStream deserializeStream(InputStream s) {
throw new UnsupportedOperationException();
}

@Override
public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> ev1) {
throw new UnsupportedOperationException();
}

@Override
public <T> T deserialize(ByteBuffer bytes, ClassTag<T> ev1) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;

/**
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
* <p>
* Within the long, the data is laid out as follows:
* <pre>
* [24 bit partition number][13 bit memory page number][27 bit offset in page]
* </pre>
* This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
* our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
* 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
* implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
* <p>
* Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
* optimization to future work as it will require more careful design to ensure that addresses are
* properly aligned (e.g. by padding records).
*/
final class PackedRecordPointer {

static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes

/**
* The maximum partition identifier that can be encoded. Note that partition ids start from 0.
*/
static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215

/** Bit mask for the lower 40 bits of a long. */
private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;

/** Bit mask for the upper 24 bits of a long */
private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;

/** Bit mask for the lower 27 bits of a long. */
private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;

/** Bit mask for the lower 51 bits of a long. */
private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;

/** Bit mask for the upper 13 bits of a long */
private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;

/**
* Pack a record address and partition id into a single word.
*
* @param recordPointer a record pointer encoded by TaskMemoryManager.
* @param partitionId a shuffle partition id (maximum value of 2^24).
* @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
*/
public static long packPointer(long recordPointer, int partitionId) {
assert (partitionId <= MAXIMUM_PARTITION_ID);
// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
return (((long) partitionId) << 40) | compressedAddress;
}

private long packedRecordPointer;

public void set(long packedRecordPointer) {
this.packedRecordPointer = packedRecordPointer;
}

public int getPartitionId() {
return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
}

public long getRecordPointer() {
final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
return pageNumber | offsetInPage;
}

}
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;

import java.io.File;

import org.apache.spark.storage.TempShuffleBlockId;

/**
* Metadata for a block of data written by {@link UnsafeShuffleExternalSorter}.
*/
final class SpillInfo {
final long[] partitionLengths;
final File file;
final TempShuffleBlockId blockId;

public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
this.partitionLengths = new long[numPartitions];
this.file = file;
this.blockId = blockId;
}
}
Loading

0 comments on commit bf0ab6e

Please sign in to comment.