Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into additional-p…
Browse files Browse the repository at this point in the history
…arquet-filter-testcases
  • Loading branch information
sarutak committed Nov 24, 2014
2 parents 9016933 + cb0e9b0 commit 7550dcb
Show file tree
Hide file tree
Showing 67 changed files with 2,366 additions and 645 deletions.
30 changes: 30 additions & 0 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,36 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
}
}

object AccumulatorParam {

// The following implicit objects were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects.

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
}

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
Expand Down
165 changes: 122 additions & 43 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class SparkContext(config: SparkConf) extends Logging {
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

val startTime = System.currentTimeMillis()

/**
* Create a SparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
Expand Down Expand Up @@ -269,8 +271,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

val startTime = System.currentTimeMillis()

// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
Expand Down Expand Up @@ -1624,47 +1624,74 @@ object SparkContext extends Logging {

private[spark] val DRIVER_IDENTIFIER = "<driver>"

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
// The following deprecated objects have already been copied to `object AccumulatorParam` to
// make the compiler find them automatically. They are duplicate codes only for backward
// compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the
// following ones.

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.2.0")
object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.2.0")
object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.2.0")
object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.2.0")
object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
// The following deprecated functions have already been moved to `object RDD` to
// make the compiler find them automatically. They are still kept here for backward compatibility
// and just call the corresponding functions in `object RDD`.

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairRDDFunctions(rdd)
RDD.rddToPairRDDFunctions(rdd)
}

implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd)

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
RDD.rddToSequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)
RDD.rddToOrderedRDDFunctions(rdd)

implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd)

implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
RDD.numericRDDToDoubleRDDFunctions(rdd)

// Implicit conversions to common Writable types, for saveAsSequenceFile

Expand All @@ -1690,40 +1717,49 @@ object SparkContext extends Logging {
arr.map(x => anyToWritable(x)).toArray)
}

// Helper objects for converting common types to Writable
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}
// The following deprecated functions have already been moved to `object WritableConverter` to
// make the compiler find them automatically. They are still kept here for backward compatibility
// and just call the corresponding functions in `object WritableConverter`.

implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def intWritableConverter(): WritableConverter[Int] =
WritableConverter.intWritableConverter()

implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def longWritableConverter(): WritableConverter[Long] =
WritableConverter.longWritableConverter()

implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def doubleWritableConverter(): WritableConverter[Double] =
WritableConverter.doubleWritableConverter()

implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def floatWritableConverter(): WritableConverter[Float] =
WritableConverter.floatWritableConverter()

implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def booleanWritableConverter(): WritableConverter[Boolean] =
WritableConverter.booleanWritableConverter()

implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
)
}
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def bytesWritableConverter(): WritableConverter[Array[Byte]] =
WritableConverter.bytesWritableConverter()

implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def stringWritableConverter(): WritableConverter[String] =
WritableConverter.stringWritableConverter()

implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def writableWritableConverter[T <: Writable]() =
WritableConverter.writableWritableConverter()

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
Expand Down Expand Up @@ -1950,3 +1986,46 @@ private[spark] class WritableConverter[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable

object WritableConverter {

// Helper objects for converting common types to Writable
private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}

// The following implicit functions were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.

implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)

implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)

implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)

implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)

implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
)
}

implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)

implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}

import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.AccumulatorParam._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark.deploy.master

import java.io._
import java.nio.ByteBuffer

import scala.reflect.ClassTag

import akka.serialization.Serialization

import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer

import scala.reflect.ClassTag

/**
* Stores data in a single on-disk directory with one file per application and worker.
Expand All @@ -34,10 +35,9 @@ import scala.reflect.ClassTag
*/
private[spark] class FileSystemPersistenceEngine(
val dir: String,
val serialization: Serializer)
val serialization: Serialization)
extends PersistenceEngine with Logging {

val serializer = serialization.newInstance()
new File(dir).mkdir()

override def persist(name: String, obj: Object): Unit = {
Expand All @@ -56,25 +56,27 @@ private[spark] class FileSystemPersistenceEngine(
private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }

val out = serializer.serializeStream(new FileOutputStream(file))
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file)
try {
out.writeObject(value)
out.write(serialized)
} finally {
out.close()
}

}

def deserializeFromFile[T](file: File): T = {
private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
try {
dis.readFully(fileData)
} finally {
dis.close()
}

serializer.deserializeStream(dis).readObject()
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
}

}
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.Random
import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.Serialization
import akka.serialization.SerializationExtension

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
Expand Down Expand Up @@ -132,15 +133,18 @@ private[spark] class Master(
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory = new FileSystemRecoveryModeFactory(conf)
val fsFactory =
new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(conf.getClass)
.newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
.newInstance(conf, SerializationExtension(context.system))
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
Expand Down
Loading

0 comments on commit 7550dcb

Please sign in to comment.