Skip to content

Commit

Permalink
Refine comment
Browse files Browse the repository at this point in the history
  • Loading branch information
caneGuy committed Jan 23, 2018
1 parent d109cce commit 3087208
Showing 1 changed file with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,21 +463,21 @@ class ExternalAppendOnlyMap[K, V, C](

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream: Option[DeserializationStream] = None
private var deserializeStream: DeserializationStream = null
private var nextItem: (K, C) = null
private var objectsRead = 0

/**
* Construct a stream that reads only from the next batch.
*/
private def nextBatchStream(): Option[DeserializationStream] = {
private def nextBatchStream(): DeserializationStream = {
// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream.isDefined) {
deserializeStream.get.close()
if (deserializeStream != null) {
deserializeStream.close()
fileStream.close()
deserializeStream = None
deserializeStream = null
fileStream = null
}

Expand All @@ -493,11 +493,11 @@ class ExternalAppendOnlyMap[K, V, C](

val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream)
Some(ser.deserializeStream(wrappedStream))
ser.deserializeStream(wrappedStream)
} else {
// No more batches left
cleanup()
None
null
}
}

Expand All @@ -509,8 +509,8 @@ class ExternalAppendOnlyMap[K, V, C](
*/
private def readNextItem(): (K, C) = {
try {
val k = deserializeStream.get.readKey().asInstanceOf[K]
val c = deserializeStream.get.readValue().asInstanceOf[C]
val k = deserializeStream.readKey().asInstanceOf[K]
val c = deserializeStream.readValue().asInstanceOf[C]
val item = (k, c)
objectsRead += 1
if (objectsRead == serializerBatchSize) {
Expand All @@ -527,10 +527,10 @@ class ExternalAppendOnlyMap[K, V, C](

override def hasNext: Boolean = {
if (nextItem == null) {
if (deserializeStream.isEmpty) {
// In case that deserializeStream has not been initialized
if (deserializeStream == null) {
// In case of deserializeStream has not been initialized
deserializeStream = nextBatchStream()
if (deserializeStream.isEmpty) {
if (deserializeStream == null) {
return false
}
}
Expand All @@ -540,8 +540,8 @@ class ExternalAppendOnlyMap[K, V, C](
}

override def next(): (K, C) = {
if (deserializeStream.isEmpty) {
// In case that deserializeStream has not been initialized when call next() directly
if (deserializeStream == null) {
// In case of deserializeStream has not been initialized when call next() directly
deserializeStream = nextBatchStream()
}
val item = if (nextItem == null) readNextItem() else nextItem
Expand All @@ -554,10 +554,9 @@ class ExternalAppendOnlyMap[K, V, C](

private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
if (ds.isDefined) {
ds.get.close()
deserializeStream = None
if (deserializeStream != null) {
deserializeStream.close()
deserializeStream = null
}
if (fileStream != null) {
fileStream.close()
Expand Down

0 comments on commit 3087208

Please sign in to comment.