diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala index c893c800181a4..c87031750a4bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala @@ -20,10 +20,10 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration +import org.apache.spark._ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} -import org.apache.spark._ private[streaming] class HDFSBackedBlockRDDPartition( @@ -36,7 +36,7 @@ private[streaming] class HDFSBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, @transient hadoopConfiguration: Configuration, - @transient override val blockIds: Array[BlockId], + @transient blockIds: Array[BlockId], @transient val segments: Array[WriteAheadLogFileSegment], val storeInBlockManager: Boolean, val storageLevel: StorageLevel @@ -46,7 +46,7 @@ class HDFSBackedBlockRDD[T: ClassTag]( "Number of block ids must be the same as number of segments!") // Hadoop Configuration is not serializable, so broadcast it as a serializable. - val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) + private val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) override def getPartitions: Array[Partition] = { assertValid() @@ -82,11 +82,19 @@ class HDFSBackedBlockRDD[T: ClassTag]( } override def getPreferredLocations(split: Partition): Seq[String] = { + /* + * First, look up the block manager to see if the block is available with Spark itself. If + * they are, then return the location of the block. Else find the data nodes where the file + * segment is located on HDFS and use those as the preferred locations. + */ val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] - val locations = getBlockIdLocations() - locations.getOrElse(partition.blockId, - HdfsUtils.getBlockLocations(partition.segment.path, partition.segment.offset, - partition.segment.length, hadoopConfiguration) - .getOrElse(new Array[String](0)).toSeq) + getBlockIdLocations().get(partition.blockId) match { + case Some(locations) => locations + case None => + val segment = partition.segment + HdfsUtils + .getBlockLocations(segment.path, segment.offset, segment.length, hadoopConfiguration) + .getOrElse(Seq.empty) + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index ece3c876d0716..c886012906e17 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -53,12 +53,12 @@ private[streaming] object HdfsUtils { } def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration): - Option[Array[String]] = { + Option[Seq[String]] = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) val fileStatus = dfs.getFileStatus(dfsPath) val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, offset, length)) - blockLocs.map(_.flatMap(_.getHosts)) + blockLocs.map(_.flatMap(_.getHosts).toSeq) } def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {