Skip to content

Commit

Permalink
Allow nextBatchStream to be called after we're done looking at all st…
Browse files Browse the repository at this point in the history
…reams

Before it could give ArrayIndexOutOfBoundsException after we read the
very last element
  • Loading branch information
mateiz committed Jul 30, 2014
1 parent a34b352 commit fa2e8db
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.rdd

import scala.language.existentials

import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.language.existentials

import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,13 @@ private[spark] class ExternalSorter[K, V, C](

/** Construct a stream that only reads from the next batch */
def nextBatchStream(): InputStream = {
batchStreamsRead += 1
ByteStreams.limit(bufferedStream, spill.serializerBatchSizes(batchStreamsRead - 1))
if (batchStreamsRead < spill.serializerBatchSizes.length) {
batchStreamsRead += 1
ByteStreams.limit(bufferedStream, spill.serializerBatchSizes(batchStreamsRead - 1))
} else {
// No more batches left; give an empty stream
bufferedStream
}
}

/**
Expand Down

0 comments on commit fa2e8db

Please sign in to comment.