-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43138][CORE] Fix ClassNotFoundException during migration #40808
[SPARK-43138][CORE] Fix ClassNotFoundException during migration #40808
Conversation
cc @Ngone51 @jiangxb1987 FYI |
Small ping on this. The PR was previously a draft since I was waiting for me to test it on a real workload, but that have now been done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm always uneasy about changing low-level serialization, but this looks reasonable
@@ -51,6 +51,7 @@ import org.apache.spark.util.Utils | |||
private[spark] class NettyBlockTransferService( | |||
conf: SparkConf, | |||
securityManager: SecurityManager, | |||
serializerManager: SerializerManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the class just instantiate a Java serializer internally rather than have the caller pass it all the time? or I'm missing that it's not always the same serializer, or has to be from the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for using the serializerManager
is that we get a configured serializer that are aware of user defined classes. It being configured here
spark/core/src/main/scala/org/apache/spark/executor/Executor.scala
Lines 168 to 172 in 3cdbc17
// Set the classloader for serializer | |
env.serializer.setDefaultClassLoader(replClassLoader) | |
// SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads | |
// for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. | |
env.serializerManager.setDefaultClassLoader(replClassLoader) |
JavaSerializer
internally we need to expose some method so we can configure the classloader that get used by it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That config comes through conf right? I am just wondering what the difference is if you still make the JavaSerializer and serializer manager inside the class rather than make the caller do it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think I think I see what you are aiming for now.
Seems that the classloader also depend on the userClassPath
passed to the executor constructor here:
userClassPath: Seq[URL] = Nil, |
spark/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Lines 1524 to 1538 in 27b0c5e
def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = { | |
Client.getUserClasspath(conf).map { uri => | |
val inputPath = uri.getPath | |
val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) { | |
Client.getClusterPath(conf, inputPath) | |
} else { | |
// Any other URI schemes should have been resolved by this point | |
assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString), | |
"getUserClasspath should only return 'file' or 'local' URIs but found: " + uri) | |
inputPath | |
} | |
val envVarResolvedFilePath = YarnSparkHadoopUtil.replaceEnvVars(replacedFilePath, sys.env) | |
Paths.get(envVarResolvedFilePath).toAbsolutePath.toUri.toURL | |
} | |
} |
so I don't see how to easily construct and correctly configure the serializer directly here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm asking something narrower, and maybe still misinformed.
Right now you've basically lifted "new JavaSerializer(conf)" into the caller, where it's wrapped in a SerializerManager, then passed back in. NettyBlockTransferService, for instance, already takes 'conf' as an argument. What's different if you make the JavaSerializer and SeralizerManager inside the class, from 'conf'? I'm just missing how it's different.
I see that this isn't quite the case in a few tests though, OK. Maybe that's reason enough for this setup, OK.
Merged to master |
What changes were proposed in this pull request?
This PR fixes an unhandled ClassNotFoundException during RDD block decommissions migrations.
The exception occurs if RDD block contains user defined during the serialization of a
ClassTag
for the user defined class. The problem for serialization of theClassTag
a instance ofJavaSerializer
(spark/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
Line 62 in ca2ddf3
The reason is this does not occur during normal block replication and only during decommission is that there is a workaround/hack in
BlockManager.doPutIterator
that replaces theClassTag
with aClassTag[Any]
when replicating that blockspark/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Lines 1657 to 1664 in ca2ddf3
Why are the changes needed?
The unhandled exception means that block replication does not work properly. Specifically cases where the block contains a user class and it not replicated at creation then the block will never successfully be migrated during decommission.
Does this PR introduce any user-facing change?
It fixes the bug. But also since it changes from a fixed
JavaSerializer
to instead use theSerializerManager
theNettyBlockTransferService
might now instead useKryoSerializer
or some other user configured serializer for the metadata.How was this patch tested?
This modifies an existing spec to correctly check that replication happens for repl defined classes while removing the hack that erases the
ClassTag
. Additionally I tested this manually on a hadoop cluster to check that it also solves the decommission migration issue. If some can point me to some better way to add a spec using user defined classes I would also like to add a unittest for it.