Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43138][CORE] Fix ClassNotFoundException during migration #40808

Conversation

eejbyfeldt
Copy link

@eejbyfeldt eejbyfeldt commented Apr 16, 2023

What changes were proposed in this pull request?

This PR fixes an unhandled ClassNotFoundException during RDD block decommissions migrations.

2023-04-08 04:15:11,791 ERROR server.TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 6425687122551756860
java.lang.ClassNotFoundException: com.class.from.user.jar.ClassName
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:398)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
    at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
    at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java:1833)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1658)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:123)
    at org.apache.spark.network.netty.NettyBlockRpcServer.deserializeMetadata(NettyBlockRpcServer.scala:180)
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:119)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)

The exception occurs if RDD block contains user defined during the serialization of a ClassTag for the user defined class. The problem for serialization of the ClassTag a instance of JavaSerializer(

private val serializer = new JavaSerializer(conf)
) is used, but it never configured to use a class loader including user defined classes. This PR solves the issue by instead use a serializer from the SerializerManager which is configured to use the correct class loader.

The reason is this does not occur during normal block replication and only during decommission is that there is a workaround/hack in BlockManager.doPutIterator that replaces the ClassTag with a ClassTag[Any] when replicating that block

// [SPARK-16550] Erase the typed classTag when using default serialization, since
// NettyBlockRpcServer crashes when deserializing repl-defined classes.
// TODO(ekl) remove this once the classloader issue on the remote end is fixed.
val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
scala.reflect.classTag[Any]
} else {
classTag
}
But during RDD migration (and probably pro-active replication) it will use a different codepath and potentially send the correct ClassTag which leads to the unhandled exception.

Why are the changes needed?

The unhandled exception means that block replication does not work properly. Specifically cases where the block contains a user class and it not replicated at creation then the block will never successfully be migrated during decommission.

Does this PR introduce any user-facing change?

It fixes the bug. But also since it changes from a fixed JavaSerializer to instead use the SerializerManager the NettyBlockTransferService might now instead use KryoSerializer or some other user configured serializer for the metadata.

How was this patch tested?

This modifies an existing spec to correctly check that replication happens for repl defined classes while removing the hack that erases the ClassTag. Additionally I tested this manually on a hadoop cluster to check that it also solves the decommission migration issue. If some can point me to some better way to add a spec using user defined classes I would also like to add a unittest for it.

@eejbyfeldt eejbyfeldt changed the title [SPARK-43138[: Fix ClaassNotFoundException during migration [SPARK-43138][CORE][WIP]: Fix ClaassNotFoundException during migration Apr 16, 2023
@eejbyfeldt eejbyfeldt changed the title [SPARK-43138][CORE][WIP]: Fix ClaassNotFoundException during migration [SPARK-43138][CORE][WIP]: Fix ClassNotFoundException during migration Apr 16, 2023
@HyukjinKwon
Copy link
Member

cc @Ngone51 @jiangxb1987 FYI

@eejbyfeldt eejbyfeldt changed the title [SPARK-43138][CORE][WIP]: Fix ClassNotFoundException during migration [SPARK-43138][CORE]: Fix ClassNotFoundException during migration Apr 24, 2023
@eejbyfeldt eejbyfeldt marked this pull request as ready for review April 24, 2023 13:22
@eejbyfeldt
Copy link
Author

Small ping on this. The PR was previously a draft since I was waiting for me to test it on a real workload, but that have now been done.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always uneasy about changing low-level serialization, but this looks reasonable

@@ -51,6 +51,7 @@ import org.apache.spark.util.Utils
private[spark] class NettyBlockTransferService(
conf: SparkConf,
securityManager: SecurityManager,
serializerManager: SerializerManager,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the class just instantiate a Java serializer internally rather than have the caller pass it all the time? or I'm missing that it's not always the same serializer, or has to be from the caller.

Copy link
Author

@eejbyfeldt eejbyfeldt May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for using the serializerManager is that we get a configured serializer that are aware of user defined classes. It being configured here

// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
// SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads
// for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too.
env.serializerManager.setDefaultClassLoader(replClassLoader)
so if we are just constructing a JavaSerializer internally we need to expose some method so we can configure the classloader that get used by it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That config comes through conf right? I am just wondering what the difference is if you still make the JavaSerializer and serializer manager inside the class rather than make the caller do it now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I think I see what you are aiming for now.

Seems that the classloader also depend on the userClassPath passed to the executor constructor here:

and exactly where that comes from seems to depend on the scheduler backend. For example for Yarn it seems to be this function
def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = {
Client.getUserClasspath(conf).map { uri =>
val inputPath = uri.getPath
val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) {
Client.getClusterPath(conf, inputPath)
} else {
// Any other URI schemes should have been resolved by this point
assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString),
"getUserClasspath should only return 'file' or 'local' URIs but found: " + uri)
inputPath
}
val envVarResolvedFilePath = YarnSparkHadoopUtil.replaceEnvVars(replacedFilePath, sys.env)
Paths.get(envVarResolvedFilePath).toAbsolutePath.toUri.toURL
}
}

so I don't see how to easily construct and correctly configure the serializer directly here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm asking something narrower, and maybe still misinformed.

Right now you've basically lifted "new JavaSerializer(conf)" into the caller, where it's wrapped in a SerializerManager, then passed back in. NettyBlockTransferService, for instance, already takes 'conf' as an argument. What's different if you make the JavaSerializer and SeralizerManager inside the class, from 'conf'? I'm just missing how it's different.

I see that this isn't quite the case in a few tests though, OK. Maybe that's reason enough for this setup, OK.

@HyukjinKwon HyukjinKwon changed the title [SPARK-43138][CORE]: Fix ClassNotFoundException during migration [SPARK-43138][CORE] Fix ClassNotFoundException during migration May 11, 2023
@srowen
Copy link
Member

srowen commented May 11, 2023

Merged to master

@srowen srowen closed this in 37a0ae3 May 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants