Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target #2366

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 76 additions & 18 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.concurrent.ExecutionContext.Implicits.global

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Random
Expand Down Expand Up @@ -111,6 +111,8 @@ private[spark] class BlockManager(
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
private val cachedPeers = new HashSet[BlockManagerId]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do u ever remove entries from here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ic you clear the hashset down there ...

private var lastPeerFetchTime = 0L

initialize()

Expand Down Expand Up @@ -786,32 +788,88 @@ private[spark] class BlockManager(
updatedBlocks
}

/**
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename this updatePeersFromMaster? the current name seems to suggest it is a really cheap getter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also update to comment to say more, like this is fetching an updated list from the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually -- never mind my renaming comment, since this uses the cache. you should still update the comment to state what this does though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updatePeersFromMaster is a misnomer as it does not actually query master until the parameter is true or the TTL has expired (which I have increased to 60 seconds). MOST of the time it is a cheap operation. So I am not sure whats best here.

I am totally open to more suggestions. How about keeping it getPeers with param name as forceUpdateFromMaster?

val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1000 ms is good enough to limit traffic? My logic is that if there are 100 nodes in the cluster 100 messages per second is cheap enough for the driver to handle. Also, this will occur only in streaming, which actively uses replication. If there is no replication being used, this is inactive.

def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl

cachedPeers.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should also do the timeout check in the synchronized block - because otherwise two racing requests will immediately send two requests to the driver, and the requests are kind of expensive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good point.

if (cachedPeers.isEmpty || forceFetch || timeout) {
cachedPeers.clear()
cachedPeers ++= master.getPeers(blockManagerId)
lastPeerFetchTime = System.currentTimeMillis
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
}
}
cachedPeers
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an MT bug here.
Since cachedPeers is updated in place, it is possible for 'previous' invocation to be using cachedPeers while the next invocation is clearing/updating it.

We can avoid that by overwriting cachedPeers instance variable with result of master.getPeers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Then we would need a separate locking object for synchronizing this.


/**
* Replicate block to another node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add to the comment that this is blocking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
@volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersReplicatedTo = new HashSet[BlockManagerId]
val peersFailedToReplicateTo = new HashSet[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
val startTime = System.nanoTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why nano instead of current milli?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because existing code did so. Changing to milli.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess this hasn't happened yet? :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry :P


var forceFetchPeers = false
var failures = 0
var done = false

// Get a random peer
def getRandomPeer(): Option[BlockManagerId] = {
val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo
if (!peers.isEmpty) Some(peers.toSeq(Random.nextInt(peers.size))) else None
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
data.rewind()
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
s"To node: $peer")

try {
blockTransferService.uploadBlockSync(
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
} catch {
case e: Exception =>
logError(s"Failed to replicate block to $peer", e)
// One by one choose a random peer and try uploading the block to it
// If replication fails (e.g., target peer is down), force the list of cached peers
// to be re-fetched from driver and then pick another random peer for replication. Also
// temporarily black list the peer for which replication failed.
while (!done) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get rid of done and just use

while (peersReplicatedTo.size < numPeersToReplicateTo && failures <= maxReplicationFailures) {

otherwise we have to track the place where done is updated to find out when it is done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah it's trickier to handle the None case.

Ok in that case let's keep the done, but do comment explicitly on the three conditions that this will terminate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments before the while, as well as at all the 3 places where done is marked as true.

getRandomPeer() match {
case Some(peer) =>
try {
val onePeerStartTime = System.nanoTime
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms"
.format((System.nanoTime - onePeerStartTime) / 1e6))
peersReplicatedTo += peer
forceFetchPeers = false
if (peersReplicatedTo.size == numPeersToReplicateTo) {
done = true
}
} catch {
case e: Exception =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add tests that would test failure connections?

you can create a block transfer service impl that throw errors in specific conditions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have sample code for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
failures += 1
forceFetchPeers = true
peersFailedToReplicateTo += peer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we might want to cache this peersFailedToReplicateTo across block updates for a short ttl (to temporarily blacklist replication to peer).
But that can be done in a future PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, curious - will replication fail only when remote peer is dead ? (and so requiring forceFetchPeers)
What about inability to add block in remote peer ? Will that cause an exception to be raised here ?

Eseentially I am trying to understand if Exception raised here always means remote peer is 'dead'.
Alternative might be to list peers which have atleast data.rewrind().remaining() space available : but we dont support that iirc (and it can get used up before we make this call anyway I guess).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that there may be other reasons for failure to send to a remote node. Even in those cases, the current behavior of re-fetching the peer list and sending to another node, is correct. Just not the most efficient. This optimization is something that can be addressed in a future PR.

if (failures > maxReplicationFailures) {
done = true
}
}
case None =>
// no peer left to replicate to
done = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if initial list had only self in executor list and we are within TTL (and so getPeers returns empty list) - bootstrapping time for example.
Do we want to check if server has updates for us ? This will kind of hose our ttl though ... but maybe corner case.

Or is this handled already ? Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial list wont ever have self as the BlockManagerMaster returns list of nodes excluding the id of the node requesting it (that is, self). Nonetheless getPeers can return empty list (e.g., local mode, with only one BlockManager). And can also happen in the bootstrapping time. However, current Spark already suffers from this problem. In fact its much worse. Currently, the peer list is only fetched once from master ( upon first time replication) is never updated every again! So this patch is a strict improvement, as the peer list updated, by default, every minute.

}

logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
.format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
}
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logError(s"Replicated $blockId of ${data.limit()} bytes to only " +
s"${peersReplicatedTo.size} peer(s) instead of ${numPeersToReplicateTo} " +
s"in ${(System.nanoTime - startTime) / 1e6} ms")
} else {
logDebug(s"Successfully replicated $blockId of ${data.limit()} bytes to " +
s"${peersReplicatedTo.size} peer(s) in ${(System.nanoTime - startTime) / 1e6} ms")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class BlockManagerId private (

def port: Int = port_

def isDriver = (executorId == "<driver>")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this. I added this TODO a while back and I think this also affects some UI code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write out the return type Boolean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right!


override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,8 @@ class BlockManagerMaster(
}

/** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
}
result
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetLocationsMultipleBlockIds(blockIds) =>
sender ! getLocationsMultipleBlockIds(blockIds)

case GetPeers(blockManagerId, size) =>
sender ! getPeers(blockManagerId, size)
case GetPeers(blockManagerId) =>
sender ! getPeers(blockManagerId)

case GetMemoryStatus =>
sender ! memoryStatus
Expand Down Expand Up @@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
* from the executors, but not from the driver.
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
// TODO: Consolidate usages of <driver>
import context.dispatcher
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || info.blockManagerId.executorId != "<driver>"
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
Expand Down Expand Up @@ -212,7 +211,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
Expand All @@ -232,7 +231,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
*/
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.executorId == "<driver>" && !isLocal
blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
Expand Down Expand Up @@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
tachyonSize: Long) {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.executorId == "<driver>" && !isLocal) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
Expand Down Expand Up @@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockIds.map(blockId => getLocations(blockId))
}

private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray

val selfIndex = peers.indexOf(blockManagerId)
/** Get the list of the peers of the given block manager */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to explain that this excludes self

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"peers" automatically means that it does not include the self. It should be obvious.

private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray
val selfIndex = blockManagerIds.indexOf(blockManagerId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do contains? Then we don't need to convert this to an array and use indexof

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, had just kept the existing code here as is. Will change, obviously better.

if (selfIndex == -1) {
throw new SparkException("Self index for " + blockManagerId + " not found")
logError("Self index for " + blockManagerId + " not found")
Seq.empty
} else {
// If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2
// Then this code will return the list [ id3 id4 id5 id1 ]
Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i =>
blockManagerIds((selfIndex + i + 1) % blockManagerIds.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can just subtract it from the set instead of doing this complicated logic, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or through filterNot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I had just preserved the existing logic here. However, since I am changing the overall logic from deterministic peer selection (where I want to maintain the order they are returned, so that no two nodes use the same 3rd node as replication target) to random peer selection, I think your suggestion makes sense.

}
}

// Note that this logic will select the same node multiple times if there aren't enough peers
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private[spark] object BlockManagerMessages {

case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster

case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class RemoveExecutor(execId: String) extends ToBlockManagerMaster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
assert(bm.isDriver, "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
Expand Down
Loading