readAll();
+
+ /**
+ * Clean all the records that are older than the threshold time. It can wait for
+ * the completion of the deletion.
+ */
+ abstract public void clean(long threshTime, boolean waitForCompletion);
+
+ /**
+ * Close this log and release any resources.
+ */
+ abstract public void close();
+}
diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java
new file mode 100644
index 0000000000000..02324189b7822
--- /dev/null
+++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util;
+
+/**
+ * This abstract class represents a handle that refers to a record written in a
+ * {@link org.apache.spark.streaming.util.WriteAheadLog WriteAheadLog}.
+ * It must contain all the information necessary for the record to be read and returned by
+ * an implemenation of the WriteAheadLog class.
+ *
+ * @see org.apache.spark.streaming.util.WriteAheadLog
+ */
+@org.apache.spark.annotation.DeveloperApi
+public abstract class WriteAheadLogRecordHandle implements java.io.Serializable {
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0a50485118588..7bfae253c3a0c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -77,7 +77,8 @@ object Checkpoint extends Logging {
}
/** Get checkpoint files present in the give directory, ordered by oldest-first */
- def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = {
+ def getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path] = {
+
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
@@ -85,6 +86,7 @@ object Checkpoint extends Logging {
}
val path = new Path(checkpointDir)
+ val fs = fsOption.getOrElse(path.getFileSystem(new Configuration()))
if (fs.exists(path)) {
val statuses = fs.listStatus(path)
if (statuses != null) {
@@ -160,7 +162,7 @@ class CheckpointWriter(
}
// Delete old checkpoint files
- val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs)
+ val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
if (allCheckpointFiles.size > 10) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
logInfo("Deleting " + file)
@@ -234,15 +236,24 @@ class CheckpointWriter(
private[streaming]
object CheckpointReader extends Logging {
- def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
- {
+ /**
+ * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
+ * files, then return None, else try to return the latest valid checkpoint object. If no
+ * checkpoint files could be read correctly, then return None (if ignoreReadError = true),
+ * or throw exception (if ignoreReadError = false).
+ */
+ def read(
+ checkpointDir: String,
+ conf: SparkConf,
+ hadoopConf: Configuration,
+ ignoreReadError: Boolean = false): Option[Checkpoint] = {
val checkpointPath = new Path(checkpointDir)
// TODO(rxin): Why is this a def?!
def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf)
// Try to find the checkpoint files
- val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
+ val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse
if (checkpointFiles.isEmpty) {
return None
}
@@ -282,7 +293,10 @@ object CheckpointReader extends Logging {
})
// If none of checkpoint files could be read, then throw exception
- throw new SparkException("Failed to read checkpoint from directory " + checkpointPath)
+ if (!ignoreReadError) {
+ throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
+ }
+ None
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index f57f295874645..117cb59fb61c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -107,6 +107,19 @@ class StreamingContext private[streaming] (
*/
def this(path: String) = this(path, new Configuration)
+ /**
+ * Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
+ * @param path Path to the directory that was specified as the checkpoint directory
+ * @param sparkContext Existing SparkContext
+ */
+ def this(path: String, sparkContext: SparkContext) = {
+ this(
+ sparkContext,
+ CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get,
+ null)
+ }
+
+
if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
@@ -115,10 +128,12 @@ class StreamingContext private[streaming] (
private[streaming] val isCheckpointPresent = (cp_ != null)
private[streaming] val sc: SparkContext = {
- if (isCheckpointPresent) {
+ if (sc_ != null) {
+ sc_
+ } else if (isCheckpointPresent) {
new SparkContext(cp_.createSparkConf())
} else {
- sc_
+ throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
}
@@ -129,7 +144,7 @@ class StreamingContext private[streaming] (
private[streaming] val conf = sc.conf
- private[streaming] val env = SparkEnv.get
+ private[streaming] val env = sc.env
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
@@ -144,7 +159,7 @@ class StreamingContext private[streaming] (
}
}
- private val nextReceiverInputStreamId = new AtomicInteger(0)
+ private val nextInputStreamId = new AtomicInteger(0)
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
@@ -174,7 +189,9 @@ class StreamingContext private[streaming] (
/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
- SparkEnv.get.metricsSystem.registerSource(streamingSource)
+ assert(env != null)
+ assert(env.metricsSystem != null)
+ env.metricsSystem.registerSource(streamingSource)
/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
@@ -224,7 +241,7 @@ class StreamingContext private[streaming] (
if (isCheckpointPresent) cp_ else null
}
- private[streaming] def getNewReceiverStreamId() = nextReceiverInputStreamId.getAndIncrement()
+ private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
/**
* Create an input stream with any arbitrary user implemented receiver.
@@ -621,19 +638,59 @@ object StreamingContext extends Logging {
hadoopConf: Configuration = new Configuration(),
createOnError: Boolean = false
): StreamingContext = {
- val checkpointOption = try {
- CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf)
- } catch {
- case e: Exception =>
- if (createOnError) {
- None
- } else {
- throw e
- }
- }
+ val checkpointOption = CheckpointReader.read(
+ checkpointPath, new SparkConf(), hadoopConf, createOnError)
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
+
+ /**
+ * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
+ * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
+ * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
+ * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
+ * that the SparkConf configuration in the checkpoint data will not be restored as the
+ * SparkContext has already been created.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+ * @param creatingFunc Function to create a new StreamingContext using the given SparkContext
+ * @param sparkContext SparkContext using which the StreamingContext will be created
+ */
+ def getOrCreate(
+ checkpointPath: String,
+ creatingFunc: SparkContext => StreamingContext,
+ sparkContext: SparkContext
+ ): StreamingContext = {
+ getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
+ }
+
+ /**
+ * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
+ * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
+ * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
+ * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
+ * that the SparkConf configuration in the checkpoint data will not be restored as the
+ * SparkContext has already been created.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+ * @param creatingFunc Function to create a new StreamingContext using the given SparkContext
+ * @param sparkContext SparkContext using which the StreamingContext will be created
+ * @param createOnError Whether to create a new StreamingContext if there is an
+ * error in reading checkpoint data. By default, an exception will be
+ * thrown on error.
+ */
+ def getOrCreate(
+ checkpointPath: String,
+ creatingFunc: SparkContext => StreamingContext,
+ sparkContext: SparkContext,
+ createOnError: Boolean
+ ): StreamingContext = {
+ val checkpointOption = CheckpointReader.read(
+ checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
+ checkpointOption.map(new StreamingContext(sparkContext, _, null))
+ .getOrElse(creatingFunc(sparkContext))
+ }
+
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 4095a7cc84946..572d7d8e8753d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -32,13 +32,14 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function0 => JFunction0}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamingListener
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.streaming.dstream.{PluggableInputDStream, ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
+import org.apache.hadoop.conf.Configuration
/**
* A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
@@ -655,6 +656,7 @@ object JavaStreamingContext {
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
*/
+ @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0")
def getOrCreate(
checkpointPath: String,
factory: JavaStreamingContextFactory
@@ -676,6 +678,7 @@ object JavaStreamingContext {
* @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
* file system
*/
+ @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
def getOrCreate(
checkpointPath: String,
hadoopConf: Configuration,
@@ -700,6 +703,7 @@ object JavaStreamingContext {
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
+ @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
def getOrCreate(
checkpointPath: String,
hadoopConf: Configuration,
@@ -712,6 +716,117 @@ object JavaStreamingContext {
new JavaStreamingContext(ssc)
}
+ /**
+ * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
+ * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
+ * @param creatingFunc Function to create a new JavaStreamingContext
+ */
+ def getOrCreate(
+ checkpointPath: String,
+ creatingFunc: JFunction0[JavaStreamingContext]
+ ): JavaStreamingContext = {
+ val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
+ creatingFunc.call().ssc
+ })
+ new JavaStreamingContext(ssc)
+ }
+
+ /**
+ * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
+ * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+ * @param creatingFunc Function to create a new JavaStreamingContext
+ * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
+ * file system
+ */
+ def getOrCreate(
+ checkpointPath: String,
+ creatingFunc: JFunction0[JavaStreamingContext],
+ hadoopConf: Configuration
+ ): JavaStreamingContext = {
+ val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
+ creatingFunc.call().ssc
+ }, hadoopConf)
+ new JavaStreamingContext(ssc)
+ }
+
+ /**
+ * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
+ * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+ * @param creatingFunc Function to create a new JavaStreamingContext
+ * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
+ * file system
+ * @param createOnError Whether to create a new JavaStreamingContext if there is an
+ * error in reading checkpoint data.
+ */
+ def getOrCreate(
+ checkpointPath: String,
+ creatingFunc: JFunction0[JavaStreamingContext],
+ hadoopConf: Configuration,
+ createOnError: Boolean
+ ): JavaStreamingContext = {
+ val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
+ creatingFunc.call().ssc
+ }, hadoopConf, createOnError)
+ new JavaStreamingContext(ssc)
+ }
+
+ /**
+ * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
+ * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+ * @param creatingFunc Function to create a new JavaStreamingContext
+ * @param sparkContext SparkContext using which the StreamingContext will be created
+ */
+ def getOrCreate(
+ checkpointPath: String,
+ creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
+ sparkContext: JavaSparkContext
+ ): JavaStreamingContext = {
+ val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
+ creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
+ }, sparkContext.sc)
+ new JavaStreamingContext(ssc)
+ }
+
+ /**
+ * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
+ * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+ * @param creatingFunc Function to create a new JavaStreamingContext
+ * @param sparkContext SparkContext using which the StreamingContext will be created
+ * @param createOnError Whether to create a new JavaStreamingContext if there is an
+ * error in reading checkpoint data.
+ */
+ def getOrCreate(
+ checkpointPath: String,
+ creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
+ sparkContext: JavaSparkContext,
+ createOnError: Boolean
+ ): JavaStreamingContext = {
+ val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
+ creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
+ }, sparkContext.sc, createOnError)
+ new JavaStreamingContext(ssc)
+ }
+
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 24f99a2b929f5..83d41f5762444 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] (
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
- if (firstNum.size > num) println("...")
+ if (firstNum.length > num) println("...")
println()
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index e652702e213ef..e4ad4b509d8d8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -41,6 +41,9 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
ssc.graph.addInputStream(this)
+ /** This is an unique identifier for the input stream. */
+ val id = ssc.getNewInputStreamId()
+
/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
* Additionally it also ensures valid times are in strictly increasing order.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 8be04314c4285..ba88416ef4009 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -24,7 +24,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
-import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.streaming.scheduler.{InputInfo, ReceivedBlockInfo}
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -39,9 +39,6 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
- /** This is an unique identifier for the receiver input stream. */
- val id = ssc.getNewReceiverStreamId()
-
/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
@@ -72,6 +69,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockStoreResults = blockInfos.map { _.blockStoreResult }
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
+ // Register the input blocks information into InputInfoTracker
+ val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
// Check whether all the results are of the same type
val resultTypes = blockStoreResults.map { _.getClass }.distinct
if (resultTypes.size > 1) {
@@ -82,7 +83,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
- _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
+ _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 93caa4ba35c7f..ebdf418f4ab6a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -16,14 +16,17 @@
*/
package org.apache.spark.streaming.rdd
+import java.nio.ByteBuffer
+
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
-import org.apache.hadoop.conf.Configuration
+import org.apache.commons.io.FileUtils
import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
+import org.apache.spark.streaming.util._
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
@@ -31,26 +34,27 @@ import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, Wri
* the segment of the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
- * @param segment segment of the write ahead log having the partition data
+ * @param walRecordHandle Handle of the record in a write ahead log having the partition data
*/
private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
- val segment: WriteAheadLogFileSegment)
+ val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition
/**
* This class represents a special case of the BlockRDD where the data blocks in
- * the block manager are also backed by segments in write ahead logs. For reading
+ * the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
- * If it does not find them, it looks up the corresponding file segment.
+ * If it does not find them, it looks up the corresponding data in the write ahead log.
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
- * @param segments Segments in write ahead logs that contain this RDD's data
- * @param storeInBlockManager Whether to store in the block manager after reading from the segment
+ * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
+ * @param storeInBlockManager Whether to store in the block manager after reading
+ * from the WAL record
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
@@ -58,15 +62,15 @@ private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
- @transient segments: Array[WriteAheadLogFileSegment],
+ @transient walRecordHandles: Array[WriteAheadLogRecordHandle],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
extends BlockRDD[T](sc, blockIds) {
require(
- blockIds.length == segments.length,
+ blockIds.length == walRecordHandles.length,
s"Number of block ids (${blockIds.length}) must be " +
- s"the same as number of segments (${segments.length}})!")
+ s"the same as number of WAL record handles (${walRecordHandles.length}})!")
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
@@ -75,13 +79,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
- new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
+ new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
}
}
/**
* Gets the partition data by getting the corresponding block from the block manager.
- * If the block does not exist, then the data is read from the corresponding segment
+ * If the block does not exist, then the data is read from the corresponding record
* in write ahead log files.
*/
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
@@ -96,10 +100,35 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
- val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
- val dataRead = reader.read(partition.segment)
- reader.close()
- logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
+ var dataRead: ByteBuffer = null
+ var writeAheadLog: WriteAheadLog = null
+ try {
+ // The WriteAheadLogUtils.createLog*** method needs a directory to create a
+ // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
+ // writing log data. However, the directory is not needed if data needs to be read, hence
+ // a dummy path is provided to satisfy the method parameter requirements.
+ // FileBasedWriteAheadLog will not create any file or directory at that path.
+ val dummyDirectory = FileUtils.getTempDirectoryPath()
+ writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
+ SparkEnv.get.conf, dummyDirectory, hadoopConf)
+ dataRead = writeAheadLog.read(partition.walRecordHandle)
+ } catch {
+ case NonFatal(e) =>
+ throw new SparkException(
+ s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
+ } finally {
+ if (writeAheadLog != null) {
+ writeAheadLog.close()
+ writeAheadLog = null
+ }
+ }
+ if (dataRead == null) {
+ throw new SparkException(
+ s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
+ s"read returned null")
+ }
+ logInfo(s"Read partition data of $this from write ahead log, record handle " +
+ partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
@@ -111,14 +140,20 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
/**
* Get the preferred location of the partition. This returns the locations of the block
- * if it is present in the block manager, else it returns the location of the
- * corresponding segment in HDFS.
+ * if it is present in the block manager, else if FileBasedWriteAheadLogSegment is used,
+ * it returns the location of the corresponding file segment in HDFS .
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
- blockLocations.getOrElse(
- HdfsUtils.getFileSegmentLocations(
- partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig))
+ blockLocations.getOrElse {
+ partition.walRecordHandle match {
+ case fileSegment: FileBasedWriteAheadLogSegment =>
+ HdfsUtils.getFileSegmentLocations(
+ fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
+ case _ =>
+ Seq.empty
+ }
+ }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 297bf04c0c25e..651b534ac1900 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -17,18 +17,18 @@
package org.apache.spark.streaming.receiver
-import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.{existentials, postfixOps}
-import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
-import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
+import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
+import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.{Logging, SparkConf, SparkException}
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
@@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
- segment: WriteAheadLogFileSegment
+ walRecordHandle: WriteAheadLogRecordHandle
) extends ReceivedBlockStoreResult
@@ -116,10 +116,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
- private val rollingInterval = conf.getInt(
- "spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
- private val maxFailures = conf.getInt(
- "spark.streaming.receiver.writeAheadLog.maxFailures", 3)
private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
@@ -139,13 +135,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
s"$effectiveStorageLevel when write ahead log is enabled")
}
- // Manages rolling log files
- private val logManager = new WriteAheadLogManager(
- checkpointDirToLogDir(checkpointDir, streamId),
- hadoopConf, rollingInterval, maxFailures,
- callerName = this.getClass.getSimpleName,
- clock = clock
- )
+ // Write ahead log manages
+ private val writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
+ conf, checkpointDirToLogDir(checkpointDir, streamId), hadoopConf)
// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
@@ -183,21 +175,22 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
- logManager.writeToLog(serializedBlock)
+ writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}
- // Combine the futures, wait for both to complete, and return the write ahead log segment
+ // Combine the futures, wait for both to complete, and return the write ahead log record handle
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
- val segment = Await.result(combinedFuture, blockStoreTimeout)
- WriteAheadLogBasedStoreResult(blockId, segment)
+ val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
+ WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
}
def cleanupOldBlocks(threshTime: Long) {
- logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
+ writeAheadLog.clean(threshTime, false)
}
def stop() {
- logManager.stop()
+ writeAheadLog.close()
+ executionContext.shutdown()
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 89af40330b9d9..93f047b91018f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -25,12 +25,13 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.util.{RpcUtils, Utils}
+import org.apache.spark.{Logging, SparkEnv, SparkException}
/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -46,7 +47,7 @@ private[streaming] class ReceiverSupervisorImpl(
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
private val receivedBlockHandler: ReceivedBlockHandler = {
- if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
@@ -146,7 +147,7 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
- trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo))
+ trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
@@ -169,13 +170,13 @@ private[streaming] class ReceiverSupervisorImpl(
override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
- trackerEndpoint.askWithReply[Boolean](msg)
+ trackerEndpoint.askWithRetry[Boolean](msg)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
- trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString))
+ trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo("Stopped receiver " + streamId)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 92dc113f397ca..5b9bfbf9b01e3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -24,6 +24,7 @@ import org.apache.spark.streaming.Time
* :: DeveloperApi ::
* Class having information on completed batches.
* @param batchTime Time of the batch
+ * @param streamIdToNumRecords A map of input stream id to record number
* @param submissionTime Clock time of when jobs of this batch was submitted to
* the streaming scheduler queue
* @param processingStartTime Clock time of when the first job of this batch started processing
@@ -32,7 +33,7 @@ import org.apache.spark.streaming.Time
@DeveloperApi
case class BatchInfo(
batchTime: Time,
- receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
+ streamIdToNumRecords: Map[Int, Long],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
@@ -58,4 +59,9 @@ case class BatchInfo(
*/
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
.map(x => x._1 + x._2).headOption
+
+ /**
+ * The number of recorders received by the receivers in this batch.
+ */
+ def numRecords: Long = streamIdToNumRecords.values.sum
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
new file mode 100644
index 0000000000000..a72efccf2f994
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import scala.collection.mutable
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.{Time, StreamingContext}
+
+/** To track the information of input stream at specified batch time. */
+private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long)
+
+/**
+ * This class manages all the input streams as well as their input data statistics. The information
+ * will be exposed through StreamingListener for monitoring.
+ */
+private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging {
+
+ // Map to track all the InputInfo related to specific batch time and input stream.
+ private val batchTimeToInputInfos = new mutable.HashMap[Time, mutable.HashMap[Int, InputInfo]]
+
+ /** Report the input information with batch time to the tracker */
+ def reportInfo(batchTime: Time, inputInfo: InputInfo): Unit = synchronized {
+ val inputInfos = batchTimeToInputInfos.getOrElseUpdate(batchTime,
+ new mutable.HashMap[Int, InputInfo]())
+
+ if (inputInfos.contains(inputInfo.inputStreamId)) {
+ throw new IllegalStateException(s"Input stream ${inputInfo.inputStreamId}} for batch" +
+ s"$batchTime is already added into InputInfoTracker, this is a illegal state")
+ }
+ inputInfos += ((inputInfo.inputStreamId, inputInfo))
+ }
+
+ /** Get the all the input stream's information of specified batch time */
+ def getInfo(batchTime: Time): Map[Int, InputInfo] = synchronized {
+ val inputInfos = batchTimeToInputInfos.get(batchTime)
+ // Convert mutable HashMap to immutable Map for the caller
+ inputInfos.map(_.toMap).getOrElse(Map[Int, InputInfo]())
+ }
+
+ /** Cleanup the tracked input information older than threshold batch time */
+ def cleanup(batchThreshTime: Time): Unit = synchronized {
+ val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
+ logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
+ batchTimeToInputInfos --= timesToCleanup
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 30cf87f5b7dd1..3c481bf3491f9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -25,15 +25,49 @@ import scala.util.Try
*/
private[streaming]
class Job(val time: Time, func: () => _) {
- var id: String = _
- var result: Try[_] = null
+ private var _id: String = _
+ private var _outputOpId: Int = _
+ private var isSet = false
+ private var _result: Try[_] = null
def run() {
- result = Try(func())
+ _result = Try(func())
}
- def setId(number: Int) {
- id = "streaming job " + time + "." + number
+ def result: Try[_] = {
+ if (_result == null) {
+ throw new IllegalStateException("Cannot access result before job finishes")
+ }
+ _result
+ }
+
+ /**
+ * @return the global unique id of this Job.
+ */
+ def id: String = {
+ if (!isSet) {
+ throw new IllegalStateException("Cannot access id before calling setId")
+ }
+ _id
+ }
+
+ /**
+ * @return the output op id of this Job. Each Job has a unique output op id in the same JobSet.
+ */
+ def outputOpId: Int = {
+ if (!isSet) {
+ throw new IllegalStateException("Cannot access number before calling setId")
+ }
+ _outputOpId
+ }
+
+ def setOutputOpId(outputOpId: Int) {
+ if (isSet) {
+ throw new IllegalStateException("Cannot call setOutputOpId more than once")
+ }
+ isSet = true
+ _id = s"streaming job $time.$outputOpId"
+ _outputOpId = outputOpId
}
override def toString: String = id
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 2467d50839add..9f93d6cbc3c20 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -243,9 +243,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
- val receivedBlockInfos =
- jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
- jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
+ val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
+ val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords)
+ jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
@@ -266,6 +266,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// checkpointing of this batch to complete.
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
+ jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}
@@ -278,6 +279,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// been saved to checkpoints, so its safe to delete block metadata and data WAL files
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
+ jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 508b89278dcba..1d1ddaaccf217 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -50,6 +50,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
+ // A tracker to track all the input stream information as well as processed record number
+ var inputInfoTracker: InputInfoTracker = null
+
private var eventLoop: EventLoop[JobSchedulerEvent] = null
def start(): Unit = synchronized {
@@ -65,6 +68,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
+ inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
@@ -172,16 +176,28 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ssc.waiter.notifyError(e)
}
- private class JobHandler(job: Job) extends Runnable {
+ private class JobHandler(job: Job) extends Runnable with Logging {
def run() {
- eventLoop.post(JobStarted(job))
- // Disable checks for existing output directories in jobs launched by the streaming scheduler,
- // since we may need to write output to an existing directory during checkpoint recovery;
- // see SPARK-4835 for more details.
- PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
- job.run()
+ ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
+ ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
+ try {
+ eventLoop.post(JobStarted(job))
+ // Disable checks for existing output directories in jobs launched by the streaming
+ // scheduler, since we may need to write output to an existing directory during checkpoint
+ // recovery; see SPARK-4835 for more details.
+ PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ job.run()
+ }
+ eventLoop.post(JobCompleted(job))
+ } finally {
+ ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
+ ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
}
- eventLoop.post(JobCompleted(job))
}
}
}
+
+private[streaming] object JobScheduler {
+ val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime"
+ val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId"
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 5b134877d0b2d..e6be63b2ddbdc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -28,14 +28,14 @@ private[streaming]
case class JobSet(
time: Time,
jobs: Seq[Job],
- receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty) {
+ streamIdToNumRecords: Map[Int, Long] = Map.empty) {
private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
private var processingStartTime = -1L // when the first job of this jobset started processing
private var processingEndTime = -1L // when the last job of this jobset finished processing
- jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
+ jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
incompleteJobs ++= jobs
def handleJobStart(job: Job) {
@@ -64,7 +64,7 @@ case class JobSet(
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
- receivedBlockInfo,
+ streamIdToNumRecords,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 200cf4ef4b0f1..14e769a281f51 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -25,10 +25,10 @@ import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.util.WriteAheadLogManager
+import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, Utils}
+import org.apache.spark.{Logging, SparkConf, SparkException}
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -70,7 +70,7 @@ private[streaming] class ReceivedBlockTracker(
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
- private val logManagerOption = createLogManager()
+ private val writeAheadLogOption = createWriteAheadLog()
private var lastAllocatedBatchTime: Time = null
@@ -155,12 +155,12 @@ private[streaming] class ReceivedBlockTracker(
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
timeToAllocatedBlocks --= timesToCleanup
- logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
+ writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
}
/** Stop the block tracker. */
def stop() {
- logManagerOption.foreach { _.stop() }
+ writeAheadLogOption.foreach { _.close() }
}
/**
@@ -190,9 +190,10 @@ private[streaming] class ReceivedBlockTracker(
timeToAllocatedBlocks --= batchTimes
}
- logManagerOption.foreach { logManager =>
+ writeAheadLogOption.foreach { writeAheadLog =>
logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
- logManager.readFromLog().foreach { byteBuffer =>
+ import scala.collection.JavaConversions._
+ writeAheadLog.readAll().foreach { byteBuffer =>
logTrace("Recovering record " + byteBuffer)
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) match {
case BlockAdditionEvent(receivedBlockInfo) =>
@@ -208,10 +209,10 @@ private[streaming] class ReceivedBlockTracker(
/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent) {
- if (isLogManagerEnabled) {
+ if (isWriteAheadLogEnabled) {
logDebug(s"Writing to log $record")
- logManagerOption.foreach { logManager =>
- logManager.writeToLog(ByteBuffer.wrap(Utils.serialize(record)))
+ writeAheadLogOption.foreach { logManager =>
+ logManager.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis())
}
}
}
@@ -222,8 +223,8 @@ private[streaming] class ReceivedBlockTracker(
}
/** Optionally create the write ahead log manager only if the feature is enabled */
- private def createLogManager(): Option[WriteAheadLogManager] = {
- if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ private def createWriteAheadLog(): Option[WriteAheadLog] = {
+ if (WriteAheadLogUtils.enableReceiverLog(conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
@@ -231,19 +232,16 @@ private[streaming] class ReceivedBlockTracker(
"See documentation for more details.")
}
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
- val rollingIntervalSecs = conf.getInt(
- "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
- val logManager = new WriteAheadLogManager(logDir, hadoopConf,
- rollingIntervalSecs = rollingIntervalSecs, clock = clock,
- callerName = "ReceivedBlockHandlerMaster")
- Some(logManager)
+
+ val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
+ Some(log)
} else {
None
}
}
- /** Check if the log manager is enabled. This is only used for testing purposes. */
- private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
+ /** Check if the write ahead log is enabled. This is only used for testing purposes. */
+ private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty
}
private[streaming] object ReceivedBlockTracker {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index c4ead6f30a63d..1af65716d3003 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.language.existentials
+import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
@@ -125,7 +126,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)
// Signal the receivers to delete old block data
- if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
receiverInfo.values.flatMap { info => Option(info.endpoint) }
.foreach { _.send(CleanupOldBlocks(cleanupThreshTime)) }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index df1c0a10704c3..e219e27785533 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.ui
import scala.xml.Node
-import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.ui.UIUtils
private[ui] abstract class BatchTableBase(tableId: String) {
@@ -31,18 +30,20 @@ private[ui] abstract class BatchTableBase(tableId: String) {
Processing Time |
}
- protected def baseRow(batch: BatchInfo): Seq[Node] = {
+ protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
- val eventCount = batch.receivedBlockInfo.values.map {
- receivers => receivers.map(_.numRecords).sum
- }.sum
+ val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")
- {formattedBatchTime} |
+
+
+ {formattedBatchTime}
+
+ |
{eventCount.toString} events |
{formattedSchedulingDelay}
@@ -73,8 +74,9 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def renderRows: Seq[Node]
}
-private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo])
- extends BatchTableBase("active-batches-table") {
+private[ui] class ActiveBatchTable(
+ runningBatches: Seq[BatchUIData],
+ waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") {
override protected def columns: Seq[Node] = super.columns ++ | Status |
@@ -85,16 +87,16 @@ private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatche
runningBatches.flatMap(batch => {runningBatchRow(batch)}
)
}
- private def runningBatchRow(batch: BatchInfo): Seq[Node] = {
+ private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ processing |
}
- private def waitingBatchRow(batch: BatchInfo): Seq[Node] = {
+ private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ queued |
}
}
-private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
+private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
extends BatchTableBase("completed-batches-table") {
override protected def columns: Seq[Node] = super.columns ++ Total Delay |
@@ -103,7 +105,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
batches.flatMap(batch => {completedBatchRow(batch)}
)
}
- private def completedBatchRow(batch: BatchInfo): Seq[Node] = {
+ private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
baseRow(batch) ++
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
new file mode 100644
index 0000000000000..2da9a29e2529e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{NodeSeq, Node}
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.streaming.Time
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+
+private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
+ private val streamingListener = parent.listener
+ private val sparkListener = parent.ssc.sc.jobProgressListener
+
+ private def columns: Seq[Node] = {
+ Output Op Id |
+ Description |
+ Duration |
+ Job Id |
+ Duration |
+ Stages: Succeeded/Total |
+ Tasks (for all stages): Succeeded/Total |
+ Error |
+ }
+
+ /**
+ * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
+ * one cell, we use "rowspan" for the first row of a output op.
+ */
+ def generateJobRow(
+ outputOpId: OutputOpId,
+ formattedOutputOpDuration: String,
+ numSparkJobRowsInOutputOp: Int,
+ isFirstRow: Boolean,
+ sparkJob: JobUIData): Seq[Node] = {
+ val lastStageInfo = Option(sparkJob.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
+ val lastStageData = lastStageInfo.flatMap { s =>
+ sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+ }
+
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+ val duration: Option[Long] = {
+ sparkJob.submissionTime.map { start =>
+ val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ }
+ val lastFailureReason =
+ sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
+ dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
+ flatMap(info => info.failureReason).headOption.getOrElse("")
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
+ val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
+
+ // In the first row, output op id and its information needs to be shown. In other rows, these
+ // cells will be taken up due to "rowspan".
+ // scalastyle:off
+ val prefixCells =
+ if (isFirstRow) {
+ {outputOpId.toString} |
+
+
+ {lastStageDescription}
+ {lastStageName}
+ |
+ {formattedOutputOpDuration} |
+ } else {
+ Nil
+ }
+ // scalastyle:on
+
+
+ {prefixCells}
+
+
+ {sparkJob.jobId}{sparkJob.jobGroup.map(id => s"($id)").getOrElse("")}
+
+ |
+
+ {formattedDuration}
+ |
+
+ {sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages}
+ {if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"}
+ {if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"}
+ |
+
+ {
+ UIUtils.makeProgressBar(
+ started = sparkJob.numActiveTasks,
+ completed = sparkJob.numCompletedTasks,
+ failed = sparkJob.numFailedTasks,
+ skipped = sparkJob.numSkippedTasks,
+ total = sparkJob.numTasks - sparkJob.numSkippedTasks)
+ }
+ |
+ {failureReasonCell(lastFailureReason)}
+
+ }
+
+ private def generateOutputOpIdRow(
+ outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
+ val sparkjobDurations = sparkJobs.map(sparkJob => {
+ sparkJob.submissionTime.map { start =>
+ val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ })
+ val formattedOutputOpDuration =
+ if (sparkjobDurations.exists(_ == None)) {
+ // If any job does not finish, set "formattedOutputOpDuration" to "-"
+ "-"
+ } else {
+ UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+ }
+ generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
+ sparkJobs.tail.map { sparkJob =>
+ generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
+ }.flatMap(x => x)
+ }
+
+ private def failureReasonCell(failureReason: String): Seq[Node] = {
+ val isMultiline = failureReason.indexOf('\n') >= 0
+ // Display the first line by default
+ val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ failureReason.substring(0, failureReason.indexOf('\n'))
+ } else {
+ failureReason
+ })
+ val details = if (isMultiline) {
+ // scalastyle:off
+
+ +details
+ ++
+
+ // scalastyle:on
+ } else {
+ ""
+ }
+ {failureReasonSummary}{details} |
+ }
+
+ private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
+ sparkListener.activeJobs.get(sparkJobId).orElse {
+ sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
+ sparkListener.failedJobs.find(_.jobId == sparkJobId)
+ }
+ }
+ }
+
+ /**
+ * Generate the job table for the batch.
+ */
+ private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
+ val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
+ sortBy(_._1). // sorted by OutputOpId
+ map { case (outputOpId, outputOpIdAndSparkJobIds) =>
+ // sort SparkJobIds for each OutputOpId
+ (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
+ }
+ sparkListener.synchronized {
+ val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
+ outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
+ // Filter out spark Job ids that don't exist in sparkListener
+ (outputOpId, sparkJobIds.flatMap(getJobData))
+ }
+
+
+
+ {columns}
+
+
+ {
+ outputOpIdWithJobs.map {
+ case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
+ }
+ }
+
+
+ }
+ }
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
+ throw new IllegalArgumentException(s"Missing id parameter")
+ }
+ val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
+
+ val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
+ throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
+ }
+
+ val formattedSchedulingDelay =
+ batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedProcessingTime =
+ batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
+ val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+
+ val summary: NodeSeq =
+
+
+ -
+ Batch Duration:
+ {UIUtils.formatDuration(streamingListener.batchDuration)}
+
+ -
+ Input data size:
+ {batchUIData.numRecords} records
+
+ -
+ Scheduling delay:
+ {formattedSchedulingDelay}
+
+ -
+ Processing time:
+ {formattedProcessingTime}
+
+ -
+ Total delay:
+ {formattedTotalDelay}
+
+
+
+
+ val jobTable =
+ if (batchUIData.outputOpIdSparkJobIdPairs.isEmpty) {
+ Cannot find any job for Batch {formattedBatchTime}.
+ } else {
+ generateJobTable(batchUIData)
+ }
+
+ val content = summary ++ jobTable
+
+ UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
new file mode 100644
index 0000000000000..99e10d2b0be12
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming.ui
+
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.ui.StreamingJobProgressListener._
+
+private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)
+
+private[ui] case class BatchUIData(
+ val batchTime: Time,
+ val receiverNumRecords: Map[Int, Long],
+ val submissionTime: Long,
+ val processingStartTime: Option[Long],
+ val processingEndTime: Option[Long],
+ var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
+
+ /**
+ * Time taken for the first job of this batch to start processing from the time this batch
+ * was submitted to the streaming scheduler. Essentially, it is
+ * `processingStartTime` - `submissionTime`.
+ */
+ def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)
+
+ /**
+ * Time taken for the all jobs of this batch to finish processing from the time they started
+ * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+ */
+ def processingDelay: Option[Long] = {
+ for (start <- processingStartTime;
+ end <- processingEndTime)
+ yield end - start
+ }
+
+ /**
+ * Time taken for all the jobs of this batch to finish processing from the time they
+ * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
+ */
+ def totalDelay: Option[Long] = processingEndTime.map(_ - submissionTime)
+
+ /**
+ * The number of recorders received by the receivers in this batch.
+ */
+ def numRecords: Long = receiverNumRecords.map(_._2).sum
+}
+
+private[ui] object BatchUIData {
+
+ def apply(batchInfo: BatchInfo): BatchUIData = {
+ new BatchUIData(
+ batchInfo.batchTime,
+ batchInfo.streamIdToNumRecords,
+ batchInfo.submissionTime,
+ batchInfo.processingStartTime,
+ batchInfo.processingEndTime
+ )
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index be1e8686cf9fa..d2729fa70d6d2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -17,29 +17,58 @@
package org.apache.spark.streaming.ui
-import scala.collection.mutable.{Queue, HashMap}
+import java.util.LinkedHashMap
+import java.util.{Map => JMap}
+import java.util.Properties
+import scala.collection.mutable.{ArrayBuffer, Queue, HashMap, SynchronizedBuffer}
+
+import org.apache.spark.scheduler._
import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
-import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
- extends StreamingListener {
+ extends StreamingListener with SparkListener {
- private val waitingBatchInfos = new HashMap[Time, BatchInfo]
- private val runningBatchInfos = new HashMap[Time, BatchInfo]
- private val completedBatchInfos = new Queue[BatchInfo]
- private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ private val waitingBatchUIData = new HashMap[Time, BatchUIData]
+ private val runningBatchUIData = new HashMap[Time, BatchUIData]
+ private val completedBatchUIData = new Queue[BatchUIData]
+ private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
+ // Because onJobStart and onBatchXXX messages are processed in different threads,
+ // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
+ // cannot use a map of (Time, BatchUIData).
+ private[ui] val batchTimeToOutputOpIdSparkJobIdPair =
+ new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] {
+ override def removeEldestEntry(
+ p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = {
+ // If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
+ // SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
+ // may add some information for a removed batch when processing "onJobStart". It will be a
+ // memory leak.
+ //
+ // To avoid the memory leak, we control the size of "batchTimeToOutputOpIdToSparkJobIds" and
+ // evict the eldest one.
+ //
+ // Note: if "onJobStart" happens before "onBatchSubmitted", the size of
+ // "batchTimeToOutputOpIdToSparkJobIds" may be greater than the number of the retained
+ // batches temporarily, so here we use "10" to handle such case. This is not a perfect
+ // solution, but at least it can handle most of cases.
+ size() >
+ waitingBatchUIData.size + runningBatchUIData.size + completedBatchUIData.size + 10
+ }
+ }
+
+
val batchDuration = ssc.graph.batchDuration.milliseconds
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
@@ -62,37 +91,62 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
synchronized {
- waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ waitingBatchUIData(batchSubmitted.batchInfo.batchTime) =
+ BatchUIData(batchSubmitted.batchInfo)
}
}
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized {
- runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
- waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
+ val batchUIData = BatchUIData(batchStarted.batchInfo)
+ runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo)
+ waitingBatchUIData.remove(batchStarted.batchInfo.batchTime)
- batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
- totalReceivedRecords += infos.map(_.numRecords).sum
- }
+ totalReceivedRecords += batchUIData.numRecords
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
synchronized {
- waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- completedBatchInfos.enqueue(batchCompleted.batchInfo)
- if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
+ waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
+ runningBatchUIData.remove(batchCompleted.batchInfo.batchTime)
+ val batchUIData = BatchUIData(batchCompleted.batchInfo)
+ completedBatchUIData.enqueue(batchUIData)
+ if (completedBatchUIData.size > batchUIDataLimit) {
+ val removedBatch = completedBatchUIData.dequeue()
+ batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
+ }
totalCompletedBatches += 1L
- batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
- totalProcessedRecords += infos.map(_.numRecords).sum
+ totalProcessedRecords += batchUIData.numRecords
+ }
+ }
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
+ getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
+ var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
+ if (outputOpIdToSparkJobIds == null) {
+ outputOpIdToSparkJobIds =
+ new ArrayBuffer[OutputOpIdAndSparkJobId]()
+ with SynchronizedBuffer[OutputOpIdAndSparkJobId]
+ batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds)
}
+ outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId)
}
}
- def numReceivers: Int = synchronized {
- ssc.graph.getReceiverInputStreams().size
+ private def getBatchTimeAndOutputOpId(properties: Properties): Option[(Time, Int)] = {
+ val batchTime = properties.getProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY)
+ if (batchTime == null) {
+ // Not submitted from JobScheduler
+ None
+ } else {
+ val outputOpId = properties.getProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY)
+ assert(outputOpId != null)
+ Some(Time(batchTime.toLong) -> outputOpId.toInt)
+ }
}
+ def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
+
def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
}
@@ -106,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def numUnprocessedBatches: Long = synchronized {
- waitingBatchInfos.size + runningBatchInfos.size
+ waitingBatchUIData.size + runningBatchUIData.size
}
- def waitingBatches: Seq[BatchInfo] = synchronized {
- waitingBatchInfos.values.toSeq
+ def waitingBatches: Seq[BatchUIData] = synchronized {
+ waitingBatchUIData.values.toSeq
}
- def runningBatches: Seq[BatchInfo] = synchronized {
- runningBatchInfos.values.toSeq
+ def runningBatches: Seq[BatchUIData] = synchronized {
+ runningBatchUIData.values.toSeq
}
- def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
- completedBatchInfos.toSeq
+ def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
+ completedBatchUIData.toSeq
}
def processingDelayDistribution: Option[Distribution] = synchronized {
@@ -134,29 +188,26 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
- val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
- val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
- (0 until numReceivers).map { receiverId =>
- val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
- batchInfo.get(receiverId).getOrElse(Array.empty)
- }
- val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
- // calculate records per second for each batch
- blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
- }
- val distributionOption = Distribution(recordsOfParticularReceiver)
- (receiverId, distributionOption)
+ val latestBatchInfos = retainedBatches.reverse.take(batchUIDataLimit)
+ val latestReceiverNumRecords = latestBatchInfos.map(_.receiverNumRecords)
+ val streamIds = ssc.graph.getInputStreams().map(_.id)
+ streamIds.map { id =>
+ val recordsOfParticularReceiver =
+ latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration)
+ val distribution = Distribution(recordsOfParticularReceiver)
+ (id, distribution)
}.toMap
}
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
- val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
- lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
- (0 until numReceivers).map { receiverId =>
- (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
+ val lastReceiverNumRecords = lastReceivedBatch.map(_.receiverNumRecords)
+ val streamIds = ssc.graph.getInputStreams().map(_.id)
+ lastReceiverNumRecords.map { receiverNumRecords =>
+ streamIds.map { id =>
+ (id, receiverNumRecords.getOrElse(id, 0L))
}.toMap
}.getOrElse {
- (0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
+ streamIds.map(id => (id, 0L)).toMap
}
}
@@ -164,20 +215,43 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.get(receiverId)
}
- def lastCompletedBatch: Option[BatchInfo] = synchronized {
- completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+ def receiverIds(): Iterable[Int] = synchronized {
+ receiverInfos.keys
+ }
+
+ def lastCompletedBatch: Option[BatchUIData] = synchronized {
+ completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
}
- def lastReceivedBatch: Option[BatchInfo] = synchronized {
+ def lastReceivedBatch: Option[BatchUIData] = synchronized {
retainedBatches.lastOption
}
- private def retainedBatches: Seq[BatchInfo] = {
- (waitingBatchInfos.values.toSeq ++
- runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
+ private def retainedBatches: Seq[BatchUIData] = {
+ (waitingBatchUIData.values.toSeq ++
+ runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering)
}
- private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
- Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = {
+ Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble))
}
+
+ def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
+ val batchUIData = waitingBatchUIData.get(batchTime).orElse {
+ runningBatchUIData.get(batchTime).orElse {
+ completedBatchUIData.find(batch => batch.batchTime == batchTime)
+ }
+ }
+ batchUIData.foreach { _batchUIData =>
+ val outputOpIdToSparkJobIds =
+ Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty)
+ _batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds
+ }
+ batchUIData
+ }
+}
+
+private[streaming] object StreamingJobProgressListener {
+ type SparkJobId = Int
+ type OutputOpId = Int
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 07fa285642eec..db37ae815bdf5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -95,7 +95,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
"Maximum rate\n[events/sec]",
"Last Error"
)
- val dataRows = (0 until listener.numReceivers).map { receiverId =>
+ val dataRows = listener.receiverIds().map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
val receiverActive = receiverInfo.map { info =>
@@ -114,7 +114,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
}.getOrElse(emptyCell)
Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++
receivedRecordStats ++ Seq(receiverLastError)
- }
+ }.toSeq
Some(listingTable(headerRow, dataRows))
} else {
None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 9a860ea4a6c68..e4039639adbad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -27,14 +27,16 @@ import StreamingTab._
* Spark Web UI tab that shows statistics of a streaming job.
* This assumes the given SparkContext has enabled its SparkUI.
*/
-private[spark] class StreamingTab(ssc: StreamingContext)
+private[spark] class StreamingTab(val ssc: StreamingContext)
extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
val parent = getSparkUI(ssc)
val listener = ssc.progressListener
ssc.addStreamingListener(listener)
+ ssc.sc.addSparkListener(listener)
attachPage(new StreamingPage(this))
+ attachPage(new BatchPage(this))
parent.attachTab(this)
def detach() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
similarity index 79%
rename from streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 38a93cc3c9a1f..9985fedc35141 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.util
import java.nio.ByteBuffer
+import java.util.{Iterator => JIterator}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -24,9 +25,9 @@ import scala.language.postfixOps
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
-import WriteAheadLogManager._
+
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.{Logging, SparkConf}
/**
* This class manages write ahead log files.
@@ -34,37 +35,32 @@ import WriteAheadLogManager._
* - Recovers the log files and the reads the recovered records upon failures.
* - Cleans up old log files.
*
- * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
- * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ * Uses [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.FileBasedWriteAheadLogReader]] to read.
*
* @param logDirectory Directory when rotating log files will be created.
* @param hadoopConf Hadoop configuration for reading/writing log files.
- * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
- * Default is one minute.
- * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
- * Default is three.
- * @param callerName Optional name of the class who is using this manager.
- * @param clock Optional clock that is used to check for rotation interval.
*/
-private[streaming] class WriteAheadLogManager(
+private[streaming] class FileBasedWriteAheadLog(
+ conf: SparkConf,
logDirectory: String,
hadoopConf: Configuration,
- rollingIntervalSecs: Int = 60,
- maxFailures: Int = 3,
- callerName: String = "",
- clock: Clock = new SystemClock
- ) extends Logging {
+ rollingIntervalSecs: Int,
+ maxFailures: Int
+ ) extends WriteAheadLog with Logging {
+
+ import FileBasedWriteAheadLog._
private val pastLogs = new ArrayBuffer[LogInfo]
- private val callerNameTag =
- if (callerName.nonEmpty) s" for $callerName" else ""
+ private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
+
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
implicit private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
override protected val logName = s"WriteAheadLogManager $callerNameTag"
private var currentLogPath: Option[String] = None
- private var currentLogWriter: WriteAheadLogWriter = null
+ private var currentLogWriter: FileBasedWriteAheadLogWriter = null
private var currentLogWriterStartTime: Long = -1L
private var currentLogWriterStopTime: Long = -1L
@@ -75,14 +71,14 @@ private[streaming] class WriteAheadLogManager(
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
* to HDFS, and will be available for readers to read.
*/
- def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized {
- var fileSegment: WriteAheadLogFileSegment = null
+ def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
+ var fileSegment: FileBasedWriteAheadLogSegment = null
var failures = 0
var lastException: Exception = null
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
- fileSegment = getLogWriter(clock.getTimeMillis()).write(byteBuffer)
+ fileSegment = getLogWriter(time).write(byteBuffer)
succeeded = true
} catch {
case ex: Exception =>
@@ -99,6 +95,19 @@ private[streaming] class WriteAheadLogManager(
fileSegment
}
+ def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+ val fileSegment = segment.asInstanceOf[FileBasedWriteAheadLogSegment]
+ var reader: FileBasedWriteAheadLogRandomReader = null
+ var byteBuffer: ByteBuffer = null
+ try {
+ reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
+ byteBuffer = reader.read(fileSegment)
+ } finally {
+ reader.close()
+ }
+ byteBuffer
+ }
+
/**
* Read all the existing logs from the log directory.
*
@@ -108,12 +117,14 @@ private[streaming] class WriteAheadLogManager(
* the latest the records. This does not deal with currently active log files, and
* hence the implementation is kept simple.
*/
- def readFromLog(): Iterator[ByteBuffer] = synchronized {
+ def readAll(): JIterator[ByteBuffer] = synchronized {
+ import scala.collection.JavaConversions._
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
+
logFilesToRead.iterator.map { file =>
logDebug(s"Creating log reader with $file")
- new WriteAheadLogReader(file, hadoopConf)
+ new FileBasedWriteAheadLogReader(file, hadoopConf)
} flatMap { x => x }
}
@@ -129,7 +140,7 @@ private[streaming] class WriteAheadLogManager(
* deleted. This should be set to true only for testing. Else the files will be deleted
* asynchronously.
*/
- def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = {
+ def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
@@ -160,7 +171,7 @@ private[streaming] class WriteAheadLogManager(
/** Stop the manager, close any open log writer */
- def stop(): Unit = synchronized {
+ def close(): Unit = synchronized {
if (currentLogWriter != null) {
currentLogWriter.close()
}
@@ -169,7 +180,7 @@ private[streaming] class WriteAheadLogManager(
}
/** Get the current log writer while taking care of rotation */
- private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
+ private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized {
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
resetWriter()
currentLogPath.foreach {
@@ -180,7 +191,7 @@ private[streaming] class WriteAheadLogManager(
val newLogPath = new Path(logDirectory,
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
currentLogPath = Some(newLogPath.toString)
- currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
+ currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf)
}
currentLogWriter
}
@@ -207,7 +218,7 @@ private[streaming] class WriteAheadLogManager(
}
}
-private[util] object WriteAheadLogManager {
+private[streaming] object FileBasedWriteAheadLog {
case class LogInfo(startTime: Long, endTime: Long, path: String)
@@ -217,6 +228,11 @@ private[util] object WriteAheadLogManager {
s"log-$startTime-$stopTime"
}
+ def getCallerName(): Option[String] = {
+ val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName)
+ stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+ }
+
/** Convert a sequence of files to a sequence of sorted LogInfo objects */
def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
files.flatMap { file =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
similarity index 83%
rename from streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
index 003989092a42a..f7168229ec15a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
@@ -23,16 +23,16 @@ import org.apache.hadoop.conf.Configuration
/**
* A random access reader for reading write ahead log files written using
- * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info,
- * this reads the record (bytebuffer) from the log file.
+ * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. Given the file segment info,
+ * this reads the record (ByteBuffer) from the log file.
*/
-private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
+private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration)
extends Closeable {
private val instream = HdfsUtils.getInputStream(path, conf)
private var closed = false
- def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized {
+ def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
assertOpen()
instream.seek(segment.offset)
val nextLength = instream.readInt()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
similarity index 93%
rename from streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
index 2afc0d1551acf..c3bb59f3fef94 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
@@ -24,11 +24,11 @@ import org.apache.spark.Logging
/**
* A reader for reading write ahead log files written using
- * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
+ * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. This reads
* the records (bytebuffers) in the log file sequentially and return them as an
* iterator of bytebuffers.
*/
-private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
+private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Configuration)
extends Iterator[ByteBuffer] with Closeable with Logging {
private val instream = HdfsUtils.getInputStream(path, conf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
similarity index 86%
rename from streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
index 1005a2c8ec303..2e1f1528fad20 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
@@ -17,4 +17,5 @@
package org.apache.spark.streaming.util
/** Class for representing a segment of data in a write ahead log file */
-private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int)
+private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int)
+ extends WriteAheadLogRecordHandle
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
similarity index 88%
rename from streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
index 679f6a6dfd7c1..e146bec32a456 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
@@ -17,18 +17,17 @@
package org.apache.spark.streaming.util
import java.io._
-import java.net.URI
import java.nio.ByteBuffer
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+import org.apache.hadoop.fs.FSDataOutputStream
/**
* A writer for writing byte-buffers to a write ahead log file.
*/
-private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
+private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf: Configuration)
extends Closeable {
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
@@ -43,11 +42,11 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
private var closed = false
/** Write the bytebuffer to the log file */
- def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
+ def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
assertOpen()
data.rewind() // Rewind to ensure all data in the buffer is retrieved
val lengthToWrite = data.remaining()
- val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
+ val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
new file mode 100644
index 0000000000000..7f6ff12c58d47
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf, SparkException}
+
+/** A helper class with utility functions related to the WriteAheadLog interface */
+private[streaming] object WriteAheadLogUtils extends Logging {
+ val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable"
+ val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class"
+ val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+ "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
+ val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures"
+
+ val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
+ val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+ "spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
+ val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures"
+
+ val DEFAULT_ROLLING_INTERVAL_SECS = 60
+ val DEFAULT_MAX_FAILURES = 3
+
+ def enableReceiverLog(conf: SparkConf): Boolean = {
+ conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
+ }
+
+ def getRollingIntervalSecs(conf: SparkConf, isDriver: Boolean): Int = {
+ if (isDriver) {
+ conf.getInt(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS)
+ } else {
+ conf.getInt(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS)
+ }
+ }
+
+ def getMaxFailures(conf: SparkConf, isDriver: Boolean): Int = {
+ if (isDriver) {
+ conf.getInt(DRIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+ } else {
+ conf.getInt(RECEIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+ }
+ }
+
+ /**
+ * Create a WriteAheadLog for the driver. If configured with custom WAL class, it will try
+ * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog.
+ */
+ def createLogForDriver(
+ sparkConf: SparkConf,
+ fileWalLogDirectory: String,
+ fileWalHadoopConf: Configuration
+ ): WriteAheadLog = {
+ createLog(true, sparkConf, fileWalLogDirectory, fileWalHadoopConf)
+ }
+
+ /**
+ * Create a WriteAheadLog for the receiver. If configured with custom WAL class, it will try
+ * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog.
+ */
+ def createLogForReceiver(
+ sparkConf: SparkConf,
+ fileWalLogDirectory: String,
+ fileWalHadoopConf: Configuration
+ ): WriteAheadLog = {
+ createLog(false, sparkConf, fileWalLogDirectory, fileWalHadoopConf)
+ }
+
+ /**
+ * Create a WriteAheadLog based on the value of the given config key. The config key is used
+ * to get the class name from the SparkConf. If the class is configured, it will try to
+ * create instance of that class by first trying `new CustomWAL(sparkConf, logDir)` then trying
+ * `new CustomWAL(sparkConf)`. If either fails, it will fail. If no class is configured, then
+ * it will create the default FileBasedWriteAheadLog.
+ */
+ private def createLog(
+ isDriver: Boolean,
+ sparkConf: SparkConf,
+ fileWalLogDirectory: String,
+ fileWalHadoopConf: Configuration
+ ): WriteAheadLog = {
+
+ val classNameOption = if (isDriver) {
+ sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY)
+ } else {
+ sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY)
+ }
+ classNameOption.map { className =>
+ try {
+ instantiateClass(
+ Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)
+ } catch {
+ case NonFatal(e) =>
+ throw new SparkException(s"Could not create a write ahead log of class $className", e)
+ }
+ }.getOrElse {
+ new FileBasedWriteAheadLog(sparkConf, fileWalLogDirectory, fileWalHadoopConf,
+ getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, isDriver))
+ }
+ }
+
+ /** Instantiate the class, either using single arg constructor or zero arg constructor */
+ private def instantiateClass(cls: Class[_ <: WriteAheadLog], conf: SparkConf): WriteAheadLog = {
+ try {
+ cls.getConstructor(classOf[SparkConf]).newInstance(conf)
+ } catch {
+ case nsme: NoSuchMethodException =>
+ cls.getConstructor().newInstance()
+ }
+ }
+}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 90340753a4eed..b1adf881dd0f5 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -21,11 +21,13 @@
import java.lang.Iterable;
import java.nio.charset.Charset;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
import scala.Tuple2;
import org.junit.Assert;
@@ -45,6 +47,7 @@
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.Utils;
+import org.apache.spark.SparkConf;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -929,7 +932,7 @@ public void testPairMap() { // Maps pair -> pair of different type
public Tuple2 call(Tuple2 in) throws Exception {
return in.swap();
}
- });
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -987,12 +990,12 @@ public void testPairMap2() { // Maps pair -> single
JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaDStream reversed = pairStream.map(
- new Function, Integer>() {
- @Override
- public Integer call(Tuple2 in) throws Exception {
- return in._2();
- }
- });
+ new Function, Integer>() {
+ @Override
+ public Integer call(Tuple2 in) throws Exception {
+ return in._2();
+ }
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1123,7 +1126,7 @@ public void testCombineByKey() {
JavaPairDStream combined = pairStream.combineByKey(
new Function() {
- @Override
+ @Override
public Integer call(Integer i) throws Exception {
return i;
}
@@ -1144,14 +1147,14 @@ public void testCountByValue() {
Arrays.asList("hello"));
List>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2("hello", 1L),
- new Tuple2("world", 1L)),
- Arrays.asList(
- new Tuple2("hello", 1L),
- new Tuple2("moon", 1L)),
- Arrays.asList(
- new Tuple2("hello", 1L)));
+ Arrays.asList(
+ new Tuple2("hello", 1L),
+ new Tuple2("world", 1L)),
+ Arrays.asList(
+ new Tuple2("hello", 1L),
+ new Tuple2("moon", 1L)),
+ Arrays.asList(
+ new Tuple2("hello", 1L)));
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream counted = stream.countByValue();
@@ -1249,17 +1252,17 @@ public void testUpdateStateByKey() {
JavaPairDStream updated = pairStream.updateStateByKey(
new Function2, Optional, Optional>() {
- @Override
- public Optional call(List values, Optional state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v: values) {
- out = out + v;
+ @Override
+ public Optional call(List values, Optional state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
}
- return Optional.of(out);
- }
});
JavaTestUtils.attachTestOutputStream(updated);
List>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1292,17 +1295,17 @@ public void testUpdateStateByKeyWithInitial() {
JavaPairDStream updated = pairStream.updateStateByKey(
new Function2, Optional, Optional>() {
- @Override
- public Optional call(List values, Optional state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v: values) {
- out = out + v;
+ @Override
+ public Optional call(List values, Optional state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
}
- return Optional.of(out);
- }
}, new HashPartitioner(1), initialRDD);
JavaTestUtils.attachTestOutputStream(updated);
List>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1328,7 +1331,7 @@ public void testReduceByKeyAndWindowWithInverse() {
JavaPairDStream reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1707,6 +1710,74 @@ public Integer call(String s) throws Exception {
Utils.deleteRecursively(tempDir);
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testContextGetOrCreate() throws InterruptedException {
+
+ final SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("newContext", "true");
+
+ File emptyDir = Files.createTempDir();
+ emptyDir.deleteOnExit();
+ StreamingContextSuite contextSuite = new StreamingContextSuite();
+ String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
+ String checkpointDir = contextSuite.createValidCheckpoint();
+
+ // Function to create JavaStreamingContext without any output operations
+ // (used to detect the new context)
+ final AtomicBoolean newContextCreated = new AtomicBoolean(false);
+ Function0 creatingFunc = new Function0() {
+ public JavaStreamingContext call() {
+ newContextCreated.set(true);
+ return new JavaStreamingContext(conf, Seconds.apply(1));
+ }
+ };
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop();
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration(), true);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop();
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration());
+ Assert.assertTrue("old context not recovered", !newContextCreated.get());
+ ssc.stop();
+
+ // Function to create JavaStreamingContext using existing JavaSparkContext
+ // without any output operations (used to detect the new context)
+ Function creatingFunc2 =
+ new Function