Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into pin-pages
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jan 19, 2016
2 parents 0ba8318 + f6f7ca9 commit feb1172
Show file tree
Hide file tree
Showing 238 changed files with 4,524 additions and 2,750 deletions.
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,7 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", {
expect_equal(coltypes(x), "map<string,string>")

df <- selectExpr(read.json(sqlContext, jsonPath), "name", "(age * 1.21) as age")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))
expect_equal(dtypes(df), list(c("name", "string"), c("age", "double")))

df1 <- select(df, cast(df$age, "integer"))
coltypes(df) <- c("character", "integer")
Expand Down
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

if [[ $FAILED != 0 ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public BypassMergeSortShuffleWriter(
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = new ShuffleWriteMetrics();
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.serializer = Serializer.getSerializer(dep.serializer());
this.shuffleBlockResolver = shuffleBlockResolver;
}
Expand Down Expand Up @@ -143,7 +142,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
Expand Down Expand Up @@ -203,7 +202,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public UnsafeShuffleWriter(
this.shuffleId = dep.shuffleId();
this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
this.partitioner = dep.partitioner();
this.writeMetrics = new ShuffleWriteMetrics();
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
Expand Down Expand Up @@ -298,8 +297,8 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incShuffleBytesWritten(outputFile.length());
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
Expand Down Expand Up @@ -411,7 +410,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,34 @@ public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream o
public void write(int b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b, off, len);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void flush() throws IOException {
final long startTime = System.nanoTime();
outputStream.flush();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void close() throws IOException {
final long startTime = System.nanoTime();
outputStream.close();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ private UnsafeExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
// TODO: metrics tracking + integration with shuffle write metrics
// need to connect the write metrics to task metrics so we count the spill IO somewhere.
this.writeMetrics = new ShuffleWriteMetrics();
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();

if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ package org.apache.spark
import java.io.{ObjectInputStream, Serializable}

import scala.collection.generic.Growable
import scala.collection.Map
import scala.collection.mutable
import scala.ref.WeakReference
import scala.reflect.ClassTag

import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils


/**
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
Expand Down Expand Up @@ -166,6 +164,7 @@ class Accumulable[R, T] private[spark] (
override def toString: String = if (value_ == null) "null" else value_.toString
}


/**
* Helper object defining how to accumulate values of a particular type. An implicit
* AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
Expand Down Expand Up @@ -201,6 +200,7 @@ trait AccumulableParam[R, T] extends Serializable {
def zero(initialValue: R): R
}


private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
extends AccumulableParam[R, T] {
Expand All @@ -224,176 +224,3 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
copy
}
}

/**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged, i.e. variables that are only "added" to through an
* associative operation and can therefore be efficiently supported in parallel. They can be used
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
* value types, and programmers can add support for new types.
*
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
* However, they cannot read its value. Only the driver program can read the accumulator's value,
* using its value method.
*
* The interpreter session below shows an accumulator being used to add up the elements of an array:
*
* {{{
* scala> val accum = sc.accumulator(0)
* accum: spark.Accumulator[Int] = 0
*
* scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
* ...
* 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
*
* scala> accum.value
* res2: Int = 10
* }}}
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
}

def this(initialValue: T, param: AccumulatorParam[T]) = {
this(initialValue, param, None, false)
}
}

/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
* available when you create Accumulators of a specific type.
*
* @tparam T type of value to accumulate
*/
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}

object AccumulatorParam {

// The following implicit objects were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects.

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double): Double = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
}

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private[spark] object Accumulators extends Logging {
/**
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
*/
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()

private var lastId: Long = 0

def newId(): Long = synchronized {
lastId += 1
lastId
}

def register(a: Accumulable[_, _]): Unit = synchronized {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
}

def remove(accId: Long) {
synchronized {
originals.remove(accId)
}
}

// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}

}

private[spark] object InternalAccumulator {
val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
val TEST_ACCUMULATOR = "testAccumulator"

// For testing only.
// This needs to be a def since we don't want to reuse the same accumulator across stages.
private def maybeTestAccumulator: Option[Accumulator[Long]] = {
if (sys.props.contains("spark.testing")) {
Some(new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
} else {
None
}
}

/**
* Accumulators for tracking internal metrics.
*
* These accumulators are created with the stage such that all tasks in the stage will
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
val internalAccumulators = Seq(
// Execution memory refers to the memory used by internal data structures created
// during shuffles, aggregations and joins. The value of this accumulator should be
// approximately the sum of the peak sizes across all such data structures created
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
) ++ maybeTestAccumulator.toSeq
internalAccumulators.foreach { accumulator =>
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
}
internalAccumulators
}
}
Loading

0 comments on commit feb1172

Please sign in to comment.