diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index fe73456ef8fad..d8be649f96e5f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -17,8 +17,7 @@ package org.apache.spark.broadcast -import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream, - ObjectInputStream, ObjectOutputStream, OutputStream} +import java.io._ import scala.reflect.ClassTag import scala.util.Random @@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag]( private val broadcastId = BroadcastBlockId(id) - TorrentBroadcast.synchronized { - SparkEnv.get.blockManager.putSingle( - broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false) - } + SparkEnv.get.blockManager.putSingle( + broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false) @transient private var arrayOfBlocks: Array[TorrentBlock] = null @transient private var totalBlocks = -1 @@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag]( // Store meta-info val metaId = BroadcastBlockId(id, "meta") val metaInfo = TorrentInfo(null, totalBlocks, totalBytes) - TorrentBroadcast.synchronized { - SparkEnv.get.blockManager.putSingle( - metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - } + SparkEnv.get.blockManager.putSingle( + metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true) // Store individual pieces for (i <- 0 until totalBlocks) { val pieceId = BroadcastBlockId(id, "piece" + i) - TorrentBroadcast.synchronized { - SparkEnv.get.blockManager.putSingle( - pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true) - } + SparkEnv.get.blockManager.putSingle( + pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true) } } @@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag]( val metaId = BroadcastBlockId(id, "meta") var attemptId = 10 while (attemptId > 0 && totalBlocks == -1) { - TorrentBroadcast.synchronized { - SparkEnv.get.blockManager.getSingle(metaId) match { - case Some(x) => - val tInfo = x.asInstanceOf[TorrentInfo] - totalBlocks = tInfo.totalBlocks - totalBytes = tInfo.totalBytes - arrayOfBlocks = new Array[TorrentBlock](totalBlocks) - hasBlocks = 0 - - case None => - Thread.sleep(500) - } + SparkEnv.get.blockManager.getSingle(metaId) match { + case Some(x) => + val tInfo = x.asInstanceOf[TorrentInfo] + totalBlocks = tInfo.totalBlocks + totalBytes = tInfo.totalBytes + arrayOfBlocks = new Array[TorrentBlock](totalBlocks) + hasBlocks = 0 + + case None => + Thread.sleep(500) } attemptId -= 1 } + if (totalBlocks == -1) { return false } @@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag]( val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList) for (pid <- recvOrder) { val pieceId = BroadcastBlockId(id, "piece" + pid) - TorrentBroadcast.synchronized { - SparkEnv.get.blockManager.getSingle(pieceId) match { - case Some(x) => - arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock] - hasBlocks += 1 - SparkEnv.get.blockManager.putSingle( - pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true) + SparkEnv.get.blockManager.getSingle(pieceId) match { + case Some(x) => + arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock] + hasBlocks += 1 + SparkEnv.get.blockManager.putSingle( + pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true) - case None => - throw new SparkException("Failed to get " + pieceId + " of " + broadcastId) - } + case None => + throw new SparkException("Failed to get " + pieceId + " of " + broadcastId) } } @@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging { * If removeFromDriver is true, also remove these persisted blocks on the driver. */ def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = { - synchronized { - SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking) - } + SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking) } }