Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into streaming-closu…
Browse files Browse the repository at this point in the history
…re-cleaner

Conflicts:
	core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
  • Loading branch information
Andrew Or committed May 3, 2015
2 parents 67eeff4 + 49549d5 commit eed3390
Show file tree
Hide file tree
Showing 23 changed files with 1,207 additions and 131 deletions.
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ case class Aggregator[K, V, C] (
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
combiners.insertAll(iter)
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
Expand Down
305 changes: 250 additions & 55 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala

Large diffs are not rendered by default.

61 changes: 51 additions & 10 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ private[spark] object SizeEstimator extends Logging {
private val FLOAT_SIZE = 4
private val DOUBLE_SIZE = 8

// Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be pointers. The size of
// a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and UseCompressedOops flag.
// The sizes should be in descending order, as we will use that information for fields placement.
private val fieldSizes = List(8, 4, 2, 1)

// Alignment boundary for objects
// TODO: Is this arch dependent ?
private val ALIGN_SIZE = 8
Expand Down Expand Up @@ -171,7 +176,7 @@ private[spark] object SizeEstimator extends Logging {
// general all ClassLoaders and Classes will be shared between objects anyway.
} else {
val classInfo = getClassInfo(cls)
state.size += classInfo.shellSize
state.size += alignSize(classInfo.shellSize)
for (field <- classInfo.pointerFields) {
state.enqueue(field.get(obj))
}
Expand Down Expand Up @@ -237,8 +242,8 @@ private[spark] object SizeEstimator extends Logging {
}
size
}
private def primitiveSize(cls: Class[_]): Long = {

private def primitiveSize(cls: Class[_]): Int = {
if (cls == classOf[Byte]) {
BYTE_SIZE
} else if (cls == classOf[Boolean]) {
Expand Down Expand Up @@ -274,30 +279,66 @@ private[spark] object SizeEstimator extends Logging {
val parent = getClassInfo(cls.getSuperclass)
var shellSize = parent.shellSize
var pointerFields = parent.pointerFields
val sizeCount = Array.fill(fieldSizes.max + 1)(0)

// iterate through the fields of this class and gather information.
for (field <- cls.getDeclaredFields) {
if (!Modifier.isStatic(field.getModifiers)) {
val fieldClass = field.getType
if (fieldClass.isPrimitive) {
shellSize += primitiveSize(fieldClass)
sizeCount(primitiveSize(fieldClass)) += 1
} else {
field.setAccessible(true) // Enable future get()'s on this field
shellSize += pointerSize
sizeCount(pointerSize) += 1
pointerFields = field :: pointerFields
}
}
}

shellSize = alignSize(shellSize)
// Based on the simulated field layout code in Aleksey Shipilev's report:
// http://cr.openjdk.java.net/~shade/papers/2013-shipilev-fieldlayout-latest.pdf
// The code is in Figure 9.
// The simplified idea of field layout consists of 4 parts (see more details in the report):
//
// 1. field alignment: HotSpot lays out the fields aligned by their size.
// 2. object alignment: HotSpot rounds instance size up to 8 bytes
// 3. consistent fields layouts throughout the hierarchy: This means we should layout
// superclass first. And we can use superclass's shellSize as a starting point to layout the
// other fields in this class.
// 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize not 4 bytes, confirmed
// with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322
//
// The real world field layout is much more complicated. There are three kinds of fields
// order in Java 8. And we don't consider the @contended annotation introduced by Java 8.
// see the HotSpot classloader code, layout_fields method for more details.
// hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/classFileParser.cpp
var alignedSize = shellSize
for (size <- fieldSizes if sizeCount(size) > 0) {
val count = sizeCount(size)
// If there are internal gaps, smaller field can fit in.
alignedSize = math.max(alignedSize, alignSizeUp(shellSize, size) + size * count)
shellSize += size * count
}

// Should choose a larger size to be new shellSize and clearly alignedSize >= shellSize, and
// round up the instance filed blocks
shellSize = alignSizeUp(alignedSize, pointerSize)

// Create and cache a new ClassInfo
val newInfo = new ClassInfo(shellSize, pointerFields)
classInfos.put(cls, newInfo)
newInfo
}

private def alignSize(size: Long): Long = {
val rem = size % ALIGN_SIZE
if (rem == 0) size else (size + ALIGN_SIZE - rem)
}
private def alignSize(size: Long): Long = alignSizeUp(size, ALIGN_SIZE)

/**
* Compute aligned size. The alignSize must be 2^n, otherwise the result will be wrong.
* When alignSize = 2^n, alignSize - 1 = 2^n - 1. The binary representation of (alignSize - 1)
* will only have n trailing 1s(0b00...001..1). ~(alignSize - 1) will be 0b11..110..0. Hence,
* (size + alignSize - 1) & ~(alignSize - 1) will set the last n bits to zeros, which leads to
* multiple of alignSize.
*/
private def alignSizeUp(size: Long, alignSize: Int): Long =
(size + alignSize - 1) & ~(alignSize - 1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ClosureCleanerSuite extends FunSuite {
val obj = new TestClassWithNesting(1)
assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
}

test("toplevel return statements in closures are identified at cleaning time") {
val ex = intercept[SparkException] {
TestObjectWithBogusReturns.run()
Expand All @@ -61,7 +61,7 @@ class ClosureCleanerSuite extends FunSuite {

test("return statements from named functions nested in closures don't raise exceptions") {
val result = TestObjectWithNestedReturns.run()
assert(result == 1)
assert(result === 1)
}

test("should clean only closures") {
Expand All @@ -79,7 +79,14 @@ class ClosureCleanerSuite extends FunSuite {

// A non-serializable class we create in closures to make sure that we aren't
// keeping references to unneeded variables from our outer closures.
class NonSerializable {}
class NonSerializable(val id: Int = -1) {
override def equals(other: Any): Boolean = {
other match {
case o: NonSerializable => id == o.id
case _ => false
}
}
}

object TestObject {
def run(): Int = {
Expand Down
Loading

0 comments on commit eed3390

Please sign in to comment.