From 24bfa382a43c2fbbf54b24bb8f03766910216490 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 1/4] improve the doc for "spark.memory.offHeap.size" --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 0017219e07261..bab7628d2dc2f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -901,7 +901,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory in bytes which can be used for off-heap allocation. + The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 2209e345df4636f8fa881b3ad45084b75f9fe3eb Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 14:00:16 -0500 Subject: [PATCH 2/4] fix --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index bab7628d2dc2f..0017219e07261 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -901,7 +901,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. + The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From a8646acf826cfbbabdf0d20129e62a14be404c3b Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Tue, 10 Jan 2017 16:24:02 -0800 Subject: [PATCH 3/4] do not remove a jobset with any failed job from jobset to prevent data loss --- .../streaming/scheduler/JobScheduler.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index b7d114bc16d48..d0e0d4d678016 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -200,19 +200,19 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) - if (jobSet.hasCompleted) { - jobSets.remove(jobSet.time) - jobGenerator.onBatchCompletion(jobSet.time) - logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( - jobSet.totalDelay / 1000.0, jobSet.time.toString, - jobSet.processingDelay / 1000.0 - )) - listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) - } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => + if (jobSet.hasCompleted) { + jobSets.remove(jobSet.time) + jobGenerator.onBatchCompletion(jobSet.time) + logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( + jobSet.totalDelay / 1000.0, jobSet.time.toString, + jobSet.processingDelay / 1000.0 + )) + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) + } } } From 465ccc68368da50579c10fa1daf7f46809411670 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 12 Jan 2017 20:17:35 -0800 Subject: [PATCH 4/4] address the comments --- .../org/apache/spark/streaming/scheduler/JobScheduler.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index d0e0d4d678016..2fa3bf7d5230b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -200,6 +200,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) + if (jobSet.hasCompleted) { + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) + } job.result match { case Failure(e) => reportError("Error running job " + job, e) @@ -211,7 +214,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) - listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } } }