Skip to content

Commit

Permalink
[SPARK-23806] Broadcast.unpersist can cause fatal exception when used…
Browse files Browse the repository at this point in the history
… with dynamic allocation
  • Loading branch information
Thomas Graves committed Mar 28, 2018
1 parent 34c4b9c commit 54cab78
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,15 @@ class BlockManagerMasterEndpoint(
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
val futures = requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove broadcast $broadcastId", e)
0 // zero blocks were removed
}
}.toSeq

Future.sequence(futures)
}

private def removeBlockManager(blockManagerId: BlockManagerId) {
Expand Down

0 comments on commit 54cab78

Please sign in to comment.