From 91fa09d75fc853f6a915299ae740e79fc849ba56 Mon Sep 17 00:00:00 2001 From: Haoyuan Li Date: Fri, 28 Mar 2014 14:14:55 -0700 Subject: [PATCH] address patrick's comments --- .../apache/spark/storage/TachyonStore.scala | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 7dec3f25c192a..ec6759aa8cd0b 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -17,6 +17,7 @@ package org.apache.spark.storage +import java.io.IOException import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer @@ -109,13 +110,28 @@ private class TachyonStore( override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val file = tachyonManager.getFile(blockId) + if (file == null || file.getLocationHosts().size == 0) { + return None + } val is = file.getInStream(ReadType.CACHE) var buffer: ByteBuffer = null - if (is != null){ - val size = file.length - val bs = new Array[Byte](size.asInstanceOf[Int]) - is.read(bs, 0, size.asInstanceOf[Int]) - buffer = ByteBuffer.wrap(bs) + try { + if (is != null) { + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) + buffer = ByteBuffer.wrap(bs) + if (fetchSize != size) { + logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size + + " is not equal to fetched size " + fetchSize) + return None + } + } + } catch { + case ioe: IOException => { + logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe) + return None + } } Some(buffer) }