Skip to content

Commit

Permalink
Fix the bug that BatchedWriteAheadLog.deaggregate doesn't restore the…
Browse files Browse the repository at this point in the history
… position
  • Loading branch information
zsxwing committed Dec 2, 2015
1 parent a5d965c commit 81d1812
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private[streaming] class ReceivedBlockTracker(
writeAheadLogOption.foreach { writeAheadLog =>
logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
writeAheadLog.readAll().asScala.foreach { byteBuffer =>
logTrace("Recovering record " + byteBuffer)
logInfo("Recovering record " + byteBuffer)
Utils.deserialize[ReceivedBlockTrackerLogEvent](
JavaUtils.bufferToArray(byteBuffer), Thread.currentThread().getContextClassLoader) match {
case BlockAdditionEvent(receivedBlockInfo) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -197,17 +198,10 @@ private[util] object BatchedWriteAheadLog {
*/
case class Record(data: ByteBuffer, time: Long, promise: Promise[WriteAheadLogRecordHandle])

/** Copies the byte array of a ByteBuffer. */
private def getByteArray(buffer: ByteBuffer): Array[Byte] = {
val byteArray = new Array[Byte](buffer.remaining())
buffer.get(byteArray)
byteArray
}

/** Aggregate multiple serialized ReceivedBlockTrackerLogEvents in a single ByteBuffer. */
def aggregate(records: Seq[Record]): ByteBuffer = {
ByteBuffer.wrap(Utils.serialize[Array[Array[Byte]]](
records.map(record => getByteArray(record.data)).toArray))
records.map(record => JavaUtils.bufferToArray(record.data)).toArray))
}

/**
Expand All @@ -216,10 +210,13 @@ private[util] object BatchedWriteAheadLog {
* method therefore needs to be backwards compatible.
*/
def deaggregate(buffer: ByteBuffer): Array[ByteBuffer] = {
val prevPosition = buffer.position()
try {
Utils.deserialize[Array[Array[Byte]]](getByteArray(buffer)).map(ByteBuffer.wrap)
Utils.deserialize[Array[Array[Byte]]](JavaUtils.bufferToArray(buffer)).map(ByteBuffer.wrap)
} catch {
case _: ClassCastException => // users may restart a stream with batching enabled
// Restore `position` so that the user can read `buffer` later
buffer.position(prevPosition)
Array(buffer)
}
}
Expand Down

0 comments on commit 81d1812

Please sign in to comment.