Skip to content

Commit

Permalink
[SPARK-43138][CORE] Fix ClassNotFoundException during migration
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR fixes an unhandled ClassNotFoundException during RDD block decommissions migrations.
```
2023-04-08 04:15:11,791 ERROR server.TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 6425687122551756860
java.lang.ClassNotFoundException: com.class.from.user.jar.ClassName
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:398)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
    at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
    at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java:1833)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1658)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:123)
    at org.apache.spark.network.netty.NettyBlockRpcServer.deserializeMetadata(NettyBlockRpcServer.scala:180)
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:119)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)
```
The exception occurs if RDD block contains user defined during the serialization of a `ClassTag` for the user defined class. The problem for serialization of the `ClassTag` a instance of `JavaSerializer`(https://github.com/apache/spark/blob/ca2ddf3c2079dda93053e64070ebda1610aa1968/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L62) is used, but it never configured to use a class loader including user defined classes. This PR solves the issue by instead use a serializer from the SerializerManager which is configured to use the correct class loader.

The reason is this does not occur during normal block replication and only during decommission is that there is a workaround/hack in `BlockManager.doPutIterator` that replaces the `ClassTag` with a `ClassTag[Any]` when replicating that block https://github.com/apache/spark/blob/ca2ddf3c2079dda93053e64070ebda1610aa1968/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1657-L1664 But during RDD migration (and probably pro-active replication) it will use a different codepath and potentially send the correct ClassTag which leads to the unhandled exception.

### Why are the changes needed?
The unhandled exception means that block replication does not work properly. Specifically cases where the block contains a user class and it not replicated at creation then the block will never successfully be migrated during decommission.

### Does this PR introduce _any_ user-facing change?
It fixes the bug. But also since it changes from a fixed `JavaSerializer` to instead use the `SerializerManager` the `NettyBlockTransferService` might now instead use `KryoSerializer` or some other user configured serializer for the metadata.

### How was this patch tested?
This modifies an existing spec to correctly check that replication happens for repl defined classes while removing the hack that erases the `ClassTag`.  Additionally I tested this manually on a hadoop cluster to check that it also solves the decommission migration issue. If some can point me to some better way to add a spec using user defined classes I would also like to add a unittest for it.

Closes apache#40808 from eejbyfeldt/SPARK-43138-class-not-found-exception.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
Emil Ejbyfeldt authored and srowen committed May 11, 2023
1 parent 46251f0 commit 37a0ae3
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 36 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ object SparkEnv extends Logging {
isDriver)

val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
new NettyBlockTransferService(conf, securityManager, serializerManager, bindAddress,
advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.network.shuffle.{BlockFetchingListener, BlockTransferLis
import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.BlockManagerMessages.IsExecutorAlive
import org.apache.spark.util.Utils
Expand All @@ -51,6 +51,7 @@ import org.apache.spark.util.Utils
private[spark] class NettyBlockTransferService(
conf: SparkConf,
securityManager: SecurityManager,
serializerManager: SerializerManager,
bindAddress: String,
override val hostName: String,
_port: Int,
Expand All @@ -59,7 +60,7 @@ private[spark] class NettyBlockTransferService(
extends BlockTransferService {

// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val serializer = serializerManager.getSerializer(scala.reflect.classTag[Any], false)
private val authEnabled = securityManager.isAuthenticationEnabled()

private[this] var transportContext: TransportContext = _
Expand Down
10 changes: 1 addition & 9 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1654,16 +1654,8 @@ private[spark] class BlockManager(
if (level.replication > 1) {
val remoteStartTimeNs = System.nanoTime()
val bytesToReplicate = doGetLocalBytes(blockId, info)
// [SPARK-16550] Erase the typed classTag when using default serialization, since
// NettyBlockRpcServer crashes when deserializing repl-defined classes.
// TODO(ekl) remove this once the classloader issue on the remote end is fixed.
val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
scala.reflect.classTag[Any]
} else {
classTag
}
try {
replicate(blockId, bytesToReplicate, level, remoteClassTag)
replicate(blockId, bytesToReplicate, level, classTag)
} finally {
bytesToReplicate.dispose()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.internal.config.Network
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
import org.apache.spark.storage.{BlockId, ShuffleBlockId}
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -126,13 +127,16 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
when(blockManager.getLocalBlockData(blockId)).thenReturn(blockBuffer)

val securityManager0 = new SecurityManager(conf0)
val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", "localhost", 0,
val serializerManager0 = new SerializerManager(new JavaSerializer(conf0), conf0)
val exec0 = new NettyBlockTransferService(
conf0, securityManager0, serializerManager0, "localhost", "localhost", 0,
1)
exec0.init(blockManager)

val securityManager1 = new SecurityManager(conf1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", "localhost", 0,
1)
val serializerManager1 = new SerializerManager(new JavaSerializer(conf1), conf1)
val exec1 = new NettyBlockTransferService(
conf1, securityManager1, serializerManager1, "localhost", "localhost", 0, 1)
exec1.init(blockManager)

val result = fetchBlock(exec0, exec1, "1", blockId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.network.BlockDataManager
import org.apache.spark.network.client.{TransportClient, TransportClientFactory}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.serializer.{JavaSerializer, SerializerManager}

class NettyBlockTransferServiceSuite
extends SparkFunSuite
Expand Down Expand Up @@ -142,10 +143,11 @@ class NettyBlockTransferServiceSuite
rpcEndpointRef: RpcEndpointRef = null): NettyBlockTransferService = {
val conf = new SparkConf()
.set("spark.app.id", s"test-${getClass.getName}")
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
val service = new NettyBlockTransferService(conf, securityManager, "localhost", "localhost",
port, 1, rpcEndpointRef)
val service = new NettyBlockTransferService(
conf, securityManager, serializerManager, "localhost", "localhost", port, 1, rpcEndpointRef)
service.init(blockDataManager)
service
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
memoryManager: Option[UnifiedMemoryManager] = None): BlockManager = {
conf.set(TEST_MEMORY, maxMem)
conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1))
val serializerManager = new SerializerManager(serializer, conf)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1))
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None)
memManager.setMemoryStore(store.memoryStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
None
}
val bmSecurityMgr = new SecurityManager(bmConf, encryptionKey)
val transfer = transferService
.getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1))
val memManager = UnifiedMemoryManager(bmConf, numCores = 1)
val serializerManager = new SerializerManager(serializer, bmConf)
val transfer = transferService.getOrElse(new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1))
val memManager = UnifiedMemoryManager(bmConf, numCores = 1)
val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 0)
Some(new ExternalBlockStoreClient(transConf, bmSecurityMgr,
Expand Down Expand Up @@ -1308,9 +1308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
conf.set(TEST_MEMORY, 1200L)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, None)
Expand Down Expand Up @@ -1357,8 +1358,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
if (conf.get(IO_ENCRYPTION_ENABLED)) Some(CryptoStreamUtils.createKey(conf)) else None
val securityMgr = new SecurityManager(conf, ioEncryptionKey)
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
val transfer =
new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
Expand Down Expand Up @@ -2193,9 +2194,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
case class User(id: Long, name: String)

conf.set(TEST_MEMORY, 1200L)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, None)
Expand All @@ -2216,9 +2218,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
= createKryoSerializerWithDiskCorruptedInputStream()

conf.set(TEST_MEMORY, 1200L)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, None)
Expand Down
14 changes: 10 additions & 4 deletions repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,19 @@ class SingletonReplSuite extends SparkFunSuite {
|}
|import org.apache.spark.storage.StorageLevel._
|case class Foo(i: Int)
|val ret = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_AND_DISK_2)
|ret.count()
|val res = sc.getRDDStorageInfo.filter(_.id == ret.id).map(_.numCachedPartitions).sum
|val rdd1 = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_ONLY)
|val rdd2 = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_ONLY_2)
|rdd1.count()
|rdd2.count()
|val cached1 = sc.getRDDStorageInfo.filter(_.id == rdd1.id).map(_.numCachedPartitions).sum
|val size1 = sc.getRDDStorageInfo.filter(_.id == rdd1.id).map(_.memSize).sum
|val size2 = sc.getRDDStorageInfo.filter(_.id == rdd2.id).map(_.memSize).sum
|assert(size2 == size1 * 2, s"Blocks not replicated properly size1=$size1, size2=$size2")
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res: Int = 10", output)
assertContains("cached1: Int = 10", output)
assertDoesNotContain("AssertionError", output)
}

test("should clone and clean line object in ClosureCleaner") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem / 2, 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None)
memManager.setMemoryStore(blockManager.memoryStore)
Expand Down

0 comments on commit 37a0ae3

Please sign in to comment.