Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into fix_autobatch
Browse files Browse the repository at this point in the history
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
  • Loading branch information
Davies Liu committed Nov 3, 2014
2 parents 1d557fc + c238fb4 commit 6880b14
Show file tree
Hide file tree
Showing 253 changed files with 10,667 additions and 1,553 deletions.
4 changes: 0 additions & 4 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@
<exclude>com/google/common/base/Present*</exclude>
</excludes>
</relocation>
<relocation>
<pattern>org.apache.commons.math3</pattern>
<shadedPattern>org.spark-project.commons.math3</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
<artifactId>spark-network-common_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
Expand Down Expand Up @@ -348,7 +349,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
new ConcurrentHashMap[Int, Array[MapStatus]]
}

private[spark] object MapOutputTracker {
private[spark] object MapOutputTracker extends Logging {

// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
Expand Down Expand Up @@ -381,6 +382,7 @@ private[spark] object MapOutputTracker {
statuses.map {
status =>
if (status == null) {
logError("Missing an output location for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
Expand Down
65 changes: 64 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import akka.actor.Props
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -533,6 +533,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
minPartitions).setName(path)
}


/**
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
* For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do
* `val rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Small files are preferred; very large files may cause bad performance.
*/
@Experimental
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, PortableDataStream)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
classOf[String],
classOf[PortableDataStream],
updateConf,
minPartitions).setName(path)
}

/**
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @param recordLength The length at which to split the records
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
: RDD[Array[Byte]] = {
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
classOf[FixedLengthBinaryInputFormat],
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
val data = br.map{ case (k, v) => v.getBytes}
data
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.{NettyBlockTransferService}
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
Expand Down Expand Up @@ -274,7 +274,7 @@ object SparkEnv extends Logging {
val shuffleMemoryManager = new ShuffleMemoryManager(conf)

val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf)
case "nio" =>
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ case class FetchFailed(
bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
reduceId: Int)
reduceId: Int,
message: String)
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
s"message=\n$message\n)"
}
}

Expand Down Expand Up @@ -117,8 +119,8 @@ case object TaskKilled extends TaskFailedReason {
* the task crashed the JVM.
*/
@DeveloperApi
case object ExecutorLostFailure extends TaskFailedReason {
override def toErrorString: String = "ExecutorLostFailure (executor lost)"
case class ExecutorLostFailure(execId: String) extends TaskFailedReason {
override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import java.io.Closeable
import java.util
import java.util.{Map => JMap}

import java.io.DataInputStream

import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
Expand All @@ -32,7 +37,8 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
Expand Down Expand Up @@ -202,6 +208,8 @@ class JavaSparkContext(val sc: SparkContext)
def textFile(path: String, minPartitions: Int): JavaRDD[String] =
sc.textFile(path, minPartitions)



/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
Expand Down Expand Up @@ -245,6 +253,78 @@ class JavaSparkContext(val sc: SparkContext)
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))

/**
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
* the value is the content of each file.
*
* For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do
* `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred; very large files but may cause bad performance.
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def binaryFiles(path: String, minPartitions: Int): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, minPartitions))

/**
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
* the value is the content of each file.
*
* For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do
* `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred; very large files but may cause bad performance.
*/
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))

/**
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @return An RDD of data with values, represented as byte arrays
*/
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
new JavaRDD(sc.binaryRecords(path, recordLength))
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ private[spark] class Worker(
private def retryConnectToMaster() {
Utils.tryOrExit {
connectionAttemptCount += 1
logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount")
if (registered) {
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = None
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
tryRegisterAllMasters()
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel())
Expand Down
27 changes: 16 additions & 11 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[spark] class Executor(
val executorSource = new ExecutorSource(this, executorId)

// Initialize Spark environment (using system properties read above)
conf.set("spark.executor.id", "executor." + executorId)
conf.set("spark.executor.id", executorId)
private val env = {
if (!isLocal) {
val port = conf.getInt("spark.executor.port", 0)
Expand All @@ -104,6 +104,9 @@ private[spark] class Executor(
// to send the result back.
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

Expand Down Expand Up @@ -210,25 +213,27 @@ private[spark] class Executor(
val resultSize = serializedDirectResult.limit

// directSend = sending directly back to the driver
val (serializedResult, directSend) = {
if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val serializedResult = {
if (resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
(ser.serialize(new IndirectTaskResult[Any](blockId)), false)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
(serializedDirectResult, true)
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

if (directSend) {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
} else {
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
}
} catch {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
Expand Down
Loading

0 comments on commit 6880b14

Please sign in to comment.