diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 28ab9dc074232..272a0a6332bbe 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -363,8 +363,8 @@ object SparkEnv extends Logging { isDriver) val blockTransferService = - new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, - blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) + new NettyBlockTransferService(conf, securityManager, serializerManager, bindAddress, + advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager( diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index a418cb2bf444f..d04d2eeef0b1b 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -40,7 +40,7 @@ import org.apache.spark.network.shuffle.{BlockFetchingListener, BlockTransferLis import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.storage.BlockManagerMessages.IsExecutorAlive import org.apache.spark.util.Utils @@ -51,6 +51,7 @@ import org.apache.spark.util.Utils private[spark] class NettyBlockTransferService( conf: SparkConf, securityManager: SecurityManager, + serializerManager: SerializerManager, bindAddress: String, override val hostName: String, _port: Int, @@ -59,7 +60,7 @@ private[spark] class NettyBlockTransferService( extends BlockTransferService { // TODO: Don't use Java serialization, use a more cross-version compatible serialization format. - private val serializer = new JavaSerializer(conf) + private val serializer = serializerManager.getSerializer(scala.reflect.classTag[Any], false) private val authEnabled = securityManager.isAuthenticationEnabled() private[this] var transportContext: TransportContext = _ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a8f74ef179ba3..b4453b4d35e66 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1654,16 +1654,8 @@ private[spark] class BlockManager( if (level.replication > 1) { val remoteStartTimeNs = System.nanoTime() val bytesToReplicate = doGetLocalBytes(blockId, info) - // [SPARK-16550] Erase the typed classTag when using default serialization, since - // NettyBlockRpcServer crashes when deserializing repl-defined classes. - // TODO(ekl) remove this once the classloader issue on the remote end is fixed. - val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) { - scala.reflect.classTag[Any] - } else { - classTag - } try { - replicate(blockId, bytesToReplicate, level, remoteClassTag) + replicate(blockId, bytesToReplicate, level, classTag) } finally { bytesToReplicate.dispose() } diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 13bb811b840d5..85b05cd5f98fa 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.internal.config.Network import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.BlockFetchingListener +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.storage.{BlockId, ShuffleBlockId} import org.apache.spark.util.ThreadUtils @@ -126,13 +127,16 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi when(blockManager.getLocalBlockData(blockId)).thenReturn(blockBuffer) val securityManager0 = new SecurityManager(conf0) - val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", "localhost", 0, + val serializerManager0 = new SerializerManager(new JavaSerializer(conf0), conf0) + val exec0 = new NettyBlockTransferService( + conf0, securityManager0, serializerManager0, "localhost", "localhost", 0, 1) exec0.init(blockManager) val securityManager1 = new SecurityManager(conf1) - val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", "localhost", 0, - 1) + val serializerManager1 = new SerializerManager(new JavaSerializer(conf1), conf1) + val exec1 = new NettyBlockTransferService( + conf1, securityManager1, serializerManager1, "localhost", "localhost", 0, 1) exec1.init(blockManager) val result = fetchBlock(exec0, exec1, "1", blockId) match { diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala index 3a6bc47257fdf..62105f1d51465 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.network.BlockDataManager import org.apache.spark.network.client.{TransportClient, TransportClientFactory} import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager} import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout} +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} class NettyBlockTransferServiceSuite extends SparkFunSuite @@ -142,10 +143,11 @@ class NettyBlockTransferServiceSuite rpcEndpointRef: RpcEndpointRef = null): NettyBlockTransferService = { val conf = new SparkConf() .set("spark.app.id", s"test-${getClass.getName}") + val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) val securityManager = new SecurityManager(conf) val blockDataManager = mock(classOf[BlockDataManager]) - val service = new NettyBlockTransferService(conf, securityManager, "localhost", "localhost", - port, 1, rpcEndpointRef) + val service = new NettyBlockTransferService( + conf, securityManager, serializerManager, "localhost", "localhost", port, 1, rpcEndpointRef) service.init(blockDataManager) service } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 14e1ee5b09d59..8729ae1edfbf6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -75,9 +75,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite memoryManager: Option[UnifiedMemoryManager] = None): BlockManager = { conf.set(TEST_MEMORY, maxMem) conf.set(MEMORY_OFFHEAP_SIZE, maxMem) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) - val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1)) val serializerManager = new SerializerManager(serializer, conf) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) + val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1)) val store = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) memManager.setMemoryStore(store.memoryStore) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index cc1c01d80cb30..295924347658c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -131,10 +131,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe None } val bmSecurityMgr = new SecurityManager(bmConf, encryptionKey) - val transfer = transferService - .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)) - val memManager = UnifiedMemoryManager(bmConf, numCores = 1) val serializerManager = new SerializerManager(serializer, bmConf) + val transfer = transferService.getOrElse(new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)) + val memManager = UnifiedMemoryManager(bmConf, numCores = 1) val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 0) Some(new ExternalBlockStoreClient(transConf, bmSecurityMgr, @@ -1308,9 +1308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe test("block store put failure") { // Use Java serializer so we can create an unserializable error. conf.set(TEST_MEMORY, 1200L) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) - val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) @@ -1357,8 +1358,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe if (conf.get(IO_ENCRYPTION_ENABLED)) Some(CryptoStreamUtils.createKey(conf)) else None val securityMgr = new SecurityManager(conf, ioEncryptionKey) val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) - val transfer = - new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, @@ -2193,9 +2194,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe case class User(id: Long, name: String) conf.set(TEST_MEMORY, 1200L) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) - val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) @@ -2216,9 +2218,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe = createKryoSerializerWithDiskCorruptedInputStream() conf.set(TEST_MEMORY, 1200L) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) - val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala index 4795306692f7a..0e3bfcfa89ddd 100644 --- a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala @@ -344,13 +344,19 @@ class SingletonReplSuite extends SparkFunSuite { |} |import org.apache.spark.storage.StorageLevel._ |case class Foo(i: Int) - |val ret = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_AND_DISK_2) - |ret.count() - |val res = sc.getRDDStorageInfo.filter(_.id == ret.id).map(_.numCachedPartitions).sum + |val rdd1 = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_ONLY) + |val rdd2 = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_ONLY_2) + |rdd1.count() + |rdd2.count() + |val cached1 = sc.getRDDStorageInfo.filter(_.id == rdd1.id).map(_.numCachedPartitions).sum + |val size1 = sc.getRDDStorageInfo.filter(_.id == rdd1.id).map(_.memSize).sum + |val size2 = sc.getRDDStorageInfo.filter(_.id == rdd2.id).map(_.memSize).sum + |assert(size2 == size1 * 2, s"Blocks not replicated properly size1=$size1, size2=$size2") """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) - assertContains("res: Int = 10", output) + assertContains("cached1: Int = 10", output) + assertDoesNotContain("AssertionError", output) } test("should clone and clean line object in ClosureCleaner") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index dcf82d5e2c28e..1913552ceed18 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -288,7 +288,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) conf: SparkConf, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem / 2, 1) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) memManager.setMemoryStore(blockManager.memoryStore)