-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target #2366
Changes from 7 commits
af0c1da
9f0ac9f
d081bf6
03de02d
7598f91
4a20531
d402506
08e5646
3821ab9
08afaa9
68e2c72
89f91a0
012afa3
a55a65c
0661773
9690f57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer} | |
|
||
import scala.concurrent.ExecutionContext.Implicits.global | ||
|
||
import scala.collection.mutable.{ArrayBuffer, HashMap} | ||
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} | ||
import scala.concurrent.{Await, Future} | ||
import scala.concurrent.duration._ | ||
import scala.util.Random | ||
|
@@ -111,6 +111,8 @@ private[spark] class BlockManager( | |
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) | ||
private val broadcastCleaner = new MetadataCleaner( | ||
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) | ||
private val cachedPeers = new HashSet[BlockManagerId] | ||
private var lastPeerFetchTime = 0L | ||
|
||
initialize() | ||
|
||
|
@@ -786,32 +788,88 @@ private[spark] class BlockManager( | |
updatedBlocks | ||
} | ||
|
||
/** | ||
* Get peer block managers in the system. | ||
*/ | ||
private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you rename this updatePeersFromMaster? the current name seems to suggest it is a really cheap getter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also update to comment to say more, like this is fetching an updated list from the driver. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually -- never mind my renaming comment, since this uses the cache. you should still update the comment to state what this does though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I am totally open to more suggestions. How about keeping it |
||
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is 1000 ms is good enough to limit traffic? My logic is that if there are 100 nodes in the cluster 100 messages per second is cheap enough for the driver to handle. Also, this will occur only in streaming, which actively uses replication. If there is no replication being used, this is inactive. |
||
def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl | ||
|
||
cachedPeers.synchronized { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you should also do the timeout check in the synchronized block - because otherwise two racing requests will immediately send two requests to the driver, and the requests are kind of expensive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes, good point. |
||
if (cachedPeers.isEmpty || forceFetch || timeout) { | ||
cachedPeers.clear() | ||
cachedPeers ++= master.getPeers(blockManagerId) | ||
lastPeerFetchTime = System.currentTimeMillis | ||
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) | ||
} | ||
} | ||
cachedPeers | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is an MT bug here. We can avoid that by overwriting cachedPeers instance variable with result of master.getPeers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Then we would need a separate locking object for synchronizing this. |
||
|
||
/** | ||
* Replicate block to another node. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add to the comment that this is blocking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
*/ | ||
@volatile var cachedPeers: Seq[BlockManagerId] = null | ||
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { | ||
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) | ||
val numPeersToReplicateTo = level.replication - 1 | ||
val peersReplicatedTo = new HashSet[BlockManagerId] | ||
val peersFailedToReplicateTo = new HashSet[BlockManagerId] | ||
val tLevel = StorageLevel( | ||
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) | ||
if (cachedPeers == null) { | ||
cachedPeers = master.getPeers(blockManagerId, level.replication - 1) | ||
val startTime = System.nanoTime | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why nano instead of current milli? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because existing code did so. Changing to milli. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i guess this hasn't happened yet? :p There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry :P |
||
|
||
var forceFetchPeers = false | ||
var failures = 0 | ||
var done = false | ||
|
||
// Get a random peer | ||
def getRandomPeer(): Option[BlockManagerId] = { | ||
val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo | ||
if (!peers.isEmpty) Some(peers.toSeq(Random.nextInt(peers.size))) else None | ||
} | ||
for (peer: BlockManagerId <- cachedPeers) { | ||
val start = System.nanoTime | ||
data.rewind() | ||
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + | ||
s"To node: $peer") | ||
|
||
try { | ||
blockTransferService.uploadBlockSync( | ||
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) | ||
} catch { | ||
case e: Exception => | ||
logError(s"Failed to replicate block to $peer", e) | ||
// One by one choose a random peer and try uploading the block to it | ||
// If replication fails (e.g., target peer is down), force the list of cached peers | ||
// to be re-fetched from driver and then pick another random peer for replication. Also | ||
// temporarily black list the peer for which replication failed. | ||
while (!done) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we get rid of done and just use while (peersReplicatedTo.size < numPeersToReplicateTo && failures <= maxReplicationFailures) { otherwise we have to track the place where done is updated to find out when it is done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah it's trickier to handle the None case. Ok in that case let's keep the done, but do comment explicitly on the three conditions that this will terminate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added comments before the while, as well as at all the 3 places where |
||
getRandomPeer() match { | ||
case Some(peer) => | ||
try { | ||
val onePeerStartTime = System.nanoTime | ||
data.rewind() | ||
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") | ||
blockTransferService.uploadBlockSync( | ||
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) | ||
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" | ||
.format((System.nanoTime - onePeerStartTime) / 1e6)) | ||
peersReplicatedTo += peer | ||
forceFetchPeers = false | ||
if (peersReplicatedTo.size == numPeersToReplicateTo) { | ||
done = true | ||
} | ||
} catch { | ||
case e: Exception => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add tests that would test failure connections? you can create a block transfer service impl that throw errors in specific conditions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have sample code for that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) | ||
failures += 1 | ||
forceFetchPeers = true | ||
peersFailedToReplicateTo += peer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, we might want to cache this peersFailedToReplicateTo across block updates for a short ttl (to temporarily blacklist replication to peer). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Btw, curious - will replication fail only when remote peer is dead ? (and so requiring forceFetchPeers) Eseentially I am trying to understand if Exception raised here always means remote peer is 'dead'. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that there may be other reasons for failure to send to a remote node. Even in those cases, the current behavior of re-fetching the peer list and sending to another node, is correct. Just not the most efficient. This optimization is something that can be addressed in a future PR. |
||
if (failures > maxReplicationFailures) { | ||
done = true | ||
} | ||
} | ||
case None => | ||
// no peer left to replicate to | ||
done = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if initial list had only self in executor list and we are within TTL (and so getPeers returns empty list) - bootstrapping time for example. Or is this handled already ? Thanks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The initial list wont ever have self as the BlockManagerMaster returns list of nodes excluding the id of the node requesting it (that is, self). Nonetheless getPeers can return empty list (e.g., local mode, with only one BlockManager). And can also happen in the bootstrapping time. However, current Spark already suffers from this problem. In fact its much worse. Currently, the peer list is only fetched once from master ( upon first time replication) is never updated every again! So this patch is a strict improvement, as the peer list updated, by default, every minute. |
||
} | ||
|
||
logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes." | ||
.format(blockId, (System.nanoTime - start) / 1e6, data.limit())) | ||
} | ||
if (peersReplicatedTo.size < numPeersToReplicateTo) { | ||
logError(s"Replicated $blockId of ${data.limit()} bytes to only " + | ||
s"${peersReplicatedTo.size} peer(s) instead of ${numPeersToReplicateTo} " + | ||
s"in ${(System.nanoTime - startTime) / 1e6} ms") | ||
} else { | ||
logDebug(s"Successfully replicated $blockId of ${data.limit()} bytes to " + | ||
s"${peersReplicatedTo.size} peer(s) in ${(System.nanoTime - startTime) / 1e6} ms") | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,8 @@ class BlockManagerId private ( | |
|
||
def port: Int = port_ | ||
|
||
def isDriver = (executorId == "<driver>") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 on this. I added this TODO a while back and I think this also affects some UI code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. write out the return type Boolean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right! |
||
|
||
override def writeExternal(out: ObjectOutput) { | ||
out.writeUTF(executorId_) | ||
out.writeUTF(host_) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus | |
case GetLocationsMultipleBlockIds(blockIds) => | ||
sender ! getLocationsMultipleBlockIds(blockIds) | ||
|
||
case GetPeers(blockManagerId, size) => | ||
sender ! getPeers(blockManagerId, size) | ||
case GetPeers(blockManagerId) => | ||
sender ! getPeers(blockManagerId) | ||
|
||
case GetMemoryStatus => | ||
sender ! memoryStatus | ||
|
@@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus | |
* from the executors, but not from the driver. | ||
*/ | ||
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = { | ||
// TODO: Consolidate usages of <driver> | ||
import context.dispatcher | ||
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) | ||
val requiredBlockManagers = blockManagerInfo.values.filter { info => | ||
removeFromDriver || info.blockManagerId.executorId != "<driver>" | ||
removeFromDriver || !info.blockManagerId.isDriver | ||
} | ||
Future.sequence( | ||
requiredBlockManagers.map { bm => | ||
|
@@ -212,7 +211,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus | |
val minSeenTime = now - slaveTimeout | ||
val toRemove = new mutable.HashSet[BlockManagerId] | ||
for (info <- blockManagerInfo.values) { | ||
if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") { | ||
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) { | ||
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " | ||
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms") | ||
toRemove += info.blockManagerId | ||
|
@@ -232,7 +231,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus | |
*/ | ||
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = { | ||
if (!blockManagerInfo.contains(blockManagerId)) { | ||
blockManagerId.executorId == "<driver>" && !isLocal | ||
blockManagerId.isDriver && !isLocal | ||
} else { | ||
blockManagerInfo(blockManagerId).updateLastSeenMs() | ||
true | ||
|
@@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus | |
tachyonSize: Long) { | ||
|
||
if (!blockManagerInfo.contains(blockManagerId)) { | ||
if (blockManagerId.executorId == "<driver>" && !isLocal) { | ||
if (blockManagerId.isDriver && !isLocal) { | ||
// We intentionally do not register the master (except in local mode), | ||
// so we should not indicate failure. | ||
sender ! true | ||
|
@@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus | |
blockIds.map(blockId => getLocations(blockId)) | ||
} | ||
|
||
private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { | ||
val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray | ||
|
||
val selfIndex = peers.indexOf(blockManagerId) | ||
/** Get the list of the peers of the given block manager */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a comment to explain that this excludes self There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "peers" automatically means that it does not include the self. It should be obvious. |
||
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { | ||
val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray | ||
val selfIndex = blockManagerIds.indexOf(blockManagerId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, had just kept the existing code here as is. Will change, obviously better. |
||
if (selfIndex == -1) { | ||
throw new SparkException("Self index for " + blockManagerId + " not found") | ||
logError("Self index for " + blockManagerId + " not found") | ||
Seq.empty | ||
} else { | ||
// If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2 | ||
// Then this code will return the list [ id3 id4 id5 id1 ] | ||
Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i => | ||
blockManagerIds((selfIndex + i + 1) % blockManagerIds.size) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we can just subtract it from the set instead of doing this complicated logic, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or through There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, I had just preserved the existing logic here. However, since I am changing the overall logic from deterministic peer selection (where I want to maintain the order they are returned, so that no two nodes use the same 3rd node as replication target) to random peer selection, I think your suggestion makes sense. |
||
} | ||
} | ||
|
||
// Note that this logic will select the same node multiple times if there aren't enough peers | ||
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when do u ever remove entries from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ic you clear the hashset down there ...