Skip to content

Commit

Permalink
Add SizeTrackingAppendOnlyBuffer and tests
Browse files Browse the repository at this point in the history
This buffer is supported by an underlying PrimitiveVector. It is to be
used for unrolling partitions, so we can efficiently check the size of
the in-memory buffer periodically.

Note that the underlying buffer cannot be an existing implementation of
a mutable Scala or Java collection. This is because we need to be exposed
to when the underlying array is resized. Otherwise, size estimation may
not be accurate.
  • Loading branch information
andrewor14 committed Jun 21, 2014
1 parent 08d0aca commit c12f093
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private[spark] object SizeEstimator extends Logging {
}
}

// Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
// Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
private val ARRAY_SIZE_FOR_SAMPLING = 200
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
_array(index)
}

def +=(value: V) {
def +=(value: V): Unit = {
if (_numElements == _array.length) {
resize(_array.length * 2)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.util.SizeEstimator

/**
* A general interface for collections that keeps track of its estimated size in bytes.
* We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
* as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
*/
private[spark] trait SizeTracker {

import SizeTracker._

/**
* Controls the base of the exponential which governs the rate of sampling.
* E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
*/
private val SAMPLE_GROWTH_RATE = 1.1

/** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
private val samples = new ArrayBuffer[Sample]

/** The average number of bytes per update between our last two samples. */
private var bytesPerUpdate: Double = _

/** Total number of insertions and updates into the map since the last resetSamples(). */
private var numUpdates: Long = _

/** The value of 'numUpdates' at which we will take our next sample. */
private var nextSampleNum: Long = _

resetSamples()

/** Called after the collection undergoes a dramatic change in size. */
protected def resetSamples(): Unit = {
numUpdates = 1
nextSampleNum = 1
samples.clear()
takeSample()
}

/** Callback to be invoked after an update. */
protected def afterUpdate(): Unit = {
numUpdates += 1
if (nextSampleNum == numUpdates) {
takeSample()
}
}

/** Takes a new sample of the current collection's size. */
private def takeSample(): Unit = {
samples += Sample(SizeEstimator.estimate(this), numUpdates)
// Only use the last two samples to extrapolate
if (samples.size > 2) {
samples.remove(0)
}
val bytesDelta = samples.toSeq.reverse match {
case latest :: previous :: tail =>
(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
// If fewer than 2 samples, assume no change
case _ => 0
}
bytesPerUpdate = math.max(0, bytesDelta)
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
}

/** Estimates the current size of the collection in bytes. O(1) time. */
def estimateSize(): Long = {
assert(samples.nonEmpty)
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.size + extrapolatedDelta).toLong
}
}

private object SizeTracker {
case class Sample(size: Long, numUpdates: Long)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection

import scala.reflect.ClassTag

/**
* An append-only buffer that keeps track of its estimated size in bytes.
*/
private[spark] class SizeTrackingAppendOnlyBuffer[T: ClassTag]
extends PrimitiveVector[T]
with SizeTracker {

override def +=(value: T): Unit = {
super.+=(value)
super.afterUpdate()
}

override def resize(newLength: Int): PrimitiveVector[T] = {
super.resize(newLength)
resetSamples()
this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,85 +17,24 @@

package org.apache.spark.util.collection

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.util.SizeEstimator
import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample

/**
* Append-only map that keeps track of its estimated size in bytes.
* We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
* as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
* An append-only map that keeps track of its estimated size in bytes.
*/
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {

/**
* Controls the base of the exponential which governs the rate of sampling.
* E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
*/
private val SAMPLE_GROWTH_RATE = 1.1

/** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
private val samples = new ArrayBuffer[Sample]()

/** Total number of insertions and updates into the map since the last resetSamples(). */
private var numUpdates: Long = _

/** The value of 'numUpdates' at which we will take our next sample. */
private var nextSampleNum: Long = _

/** The average number of bytes per update between our last two samples. */
private var bytesPerUpdate: Double = _

resetSamples()

/** Called after the map grows in size, as this can be a dramatic change for small objects. */
def resetSamples() {
numUpdates = 1
nextSampleNum = 1
samples.clear()
takeSample()
}
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker {

override def update(key: K, value: V): Unit = {
super.update(key, value)
numUpdates += 1
if (nextSampleNum == numUpdates) { takeSample() }
super.afterUpdate()
}

override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
numUpdates += 1
if (nextSampleNum == numUpdates) { takeSample() }
super.afterUpdate()
newValue
}

/** Takes a new sample of the current map's size. */
def takeSample() {
samples += Sample(SizeEstimator.estimate(this), numUpdates)
// Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
case latest :: previous :: tail =>
(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
case _ =>
0
})
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
}

override protected def growTable() {
override protected def growTable(): Unit = {
super.growTable()
resetSamples()
}

/** Estimates the current size of the map in bytes. O(1) time. */
def estimateSize(): Long = {
assert(samples.nonEmpty)
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.size + extrapolatedDelta).toLong
}
}

private object SizeTrackingAppendOnlyMap {
case class Sample(size: Long, numUpdates: Long)
}

This file was deleted.

Loading

0 comments on commit c12f093

Please sign in to comment.