diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index e05db236addca..b5d865bcd1820 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -302,7 +302,11 @@ abstract class DStream[T: ClassTag] ( " for checkpointing at time " + time) } generatedRDDs.put(time, newRDD) - Some(newRDD) + if (newRDD.partitions.size > 0) { + Some(newRDD) + } else { + None + } case None => None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 7cd4554282ca1..1b912c73c21b6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -37,7 +37,11 @@ class TransformedDStream[U: ClassTag] ( override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq - Some(transformFunc(parentRDDs, validTime)) + val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).filter(_ != null).toSeq + if (parentRDDs.size > 0) { + Some(transformFunc(parentRDDs, validTime)) + } else { + None + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 57429a15329a1..994f7cba360a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -48,8 +48,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) val rdds = new ArrayBuffer[RDD[T]]() parents.map(_.getOrCompute(validTime)).foreach(_ match { case Some(rdd) => rdds += rdd - case None => throw new Exception("Could not generate RDD from a parent for unifying at time " - + validTime) + case None => rdds }) if (rdds.size > 0) { Some(new UnionRDD(ssc.sc, rdds))