Skip to content

Commit

Permalink
Remote BlockFetchTracker.
Browse files Browse the repository at this point in the history
This trait seems to have been created a while ago when there
were multiple implementations; now that there's just one, there's
no reason not to merge it into the BlockFetcherIterator trait.
  • Loading branch information
kayousterhout committed Feb 28, 2014
1 parent 5a3ad10 commit 8173939
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 38 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ import org.apache.spark.util.Utils
*/

private[storage]
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])]
with Logging with BlockFetchTracker {
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
def totalBlocks: Int
def numLocalBlocks: Int
def numRemoteBlocks: Int
def remoteFetchTime: Long
def fetchWaitTime: Long
def remoteBytesRead: Long
}


Expand Down Expand Up @@ -233,7 +238,16 @@ object BlockFetcherIterator {
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}

//an iterator that will read fetched blocks off the queue as they arrive.
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead


// Implementing the Iterator methods with an iterator that reads fetched blocks off the queue
// as they arrive.
@volatile protected var resultsGotten = 0

override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
Expand All @@ -251,14 +265,6 @@ object BlockFetcherIterator {
}
(result.blockId, if (result.failed) None else Some(result.deserialize()))
}

// Implementing BlockFetchTracker trait.
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead
}
// End of BasicBlockFetcherIterator

Expand Down

0 comments on commit 8173939

Please sign in to comment.