-
Notifications
You must be signed in to change notification settings - Fork 118
Bypass init-containers if spark.jars
and spark.files
is empty or …
#348
Bypass init-containers if spark.jars
and spark.files
is empty or …
#348
Conversation
@mccheah Would you like to take a look ? |
Thanks for the contribution @chenchun ! Definitely appreciate seeing more and more people using this project! For this PR, at first glance it look like the scalastyle check is failing because one of the lines is too long, see http://spark-k8s-jenkins.pepperdata.org:8080/job/PR-spark-k8s-full-build/569/consoleFull#1143296497853a0453-9a85-4740-a867-694552c49a93 Would you mind please updating the PR to pass scalastyle? You can run it locally with |
} | ||
} | ||
var podBuilder = basePod | ||
var sparkConfWithExecutorInit = sparkConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer using val
over var
where possible
if (e.nonEmpty && !e.startsWith("local://")) { | ||
needInitContainer = true | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can do something like
val needInitContainer = (resolvedSparkJars ++ resolvedSparkFiles).exists { e ->
e.nonEmpty && !e.startsWith("local://")
}
needInitContainer = true | ||
} | ||
} | ||
var podBuilder = basePod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write the logic such that this can be kept as a val
. There's a few options, but here's one:
val (sparkConfWithExecutorInit, podBuilderWithInitContainer) = if (needInitContainer { ... } else (sparkConf, basePod)
val credentialsSecret = credentialsMounter.createCredentialsSecret() | ||
val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials( | ||
podWithInitContainer, driverContainer.getName, credentialsSecret) | ||
podBuilder = credentialsMounter.mountDriverKubernetesCredentials( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, use val
everywhere and don't re-assign here.
It might be preferable to have the switch on if init-containers are needed or not to be handled inside either one of the existing modules or in a new module. See |
@@ -168,24 +168,34 @@ private[spark] class Client( | |||
val initContainerConfigMap = initContainerComponentsProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we shouldn't be making a config map if we're not using an init-container. In general we should try to encapsulate everything related to whether or not we need an init-container in one place, so that we only have to make the check once and run everything that relates to that check there.
Thanks for the review. Haven't got chance to test it, but if this is the right direction? @mccheah |
|
||
override def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = { | ||
new ExecutorInitContainerConfigurationImpl( | ||
if (needInitContainer) new ExecutorInitContainerConfigurationImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the repeated instances of checking for this variable is indicating we need to do some refactoring here. I already had a hunch that we had done too much sharding and fragmentation in this architecture, and this repeated logic seems to confirm that theory. In v1 we had the extreme that had a particularly monolithic class hierarchy that was difficult to read because we had everything in one place, but in this code we appear to have went to the opposite extreme by trying to make every trait do exactly one thing, but that might have been too granular and results in problems like this.
I wonder if you could come up with a design that makes it such that we only have to switch once. One example of this could be to have a single method in this provider class that creates an init container "bundle", which can be a case class / struct of all the things that's required to set up one of these init containers. Then, we can have a switch that provides an Option
of one of these bundles, and either returns all of the components, or none of them.
We don't actually have to merge the classes themselves, though in some cases that might make sense. We particularly need to isolate the SparkPodInitContainerBootstrap
because it's a component that's re-used for both the driver pod and the executor pods. But there might be other cases where it makes more sense to combine some of the classes.
I'm fairly open to any ideas on how to make this code better, but can you see what can be done here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ifilonenko who has also been working on this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() | ||
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs | ||
private val needInitContainer = (resolvedSparkJars ++ resolvedSparkFiles).exists { e => | ||
e.nonEmpty && !e.startsWith("local://") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to refrain from this check by leveraging the functions already provided by: here
or if use-case is specific enough try adding the function to the Utils for re-usability within the system
Thanks for your comments. I think I started to get into Scala. @mccheah @ifilonenko PTAL. |
@@ -105,7 +105,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( | |||
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) | |||
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT) | |||
|
|||
override def provideInitContainerConfigMapBuilder( | |||
private def provideInitContainerConfigMapBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is causing Compile errors in Unit Test (as seen here in ClientV2Suite) please modify ClientV2Suite to account for these changes
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver() | ||
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs | ||
if (KubernetesFileUtils.getNonContainerLocalFiles( | ||
containerLocalizedFilesResolver.resolveSubmittedSparkJars() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good use of KubernetesFileUtils however we are already calling this in Client.scala. Maybe pass those jars down so we don't have to re-run this same command in multiple places
maybeSubmittedResourceIds: Option[SubmittedResourceIds]): Option[InitContainerBundle] = { | ||
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver() | ||
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs | ||
if (KubernetesFileUtils.getNonContainerLocalFiles( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why local://? What if it includes: hdfs://, s3? or whatnot? @mccheah is intention here to bypass if only local:// or if all non-file:// ? In which case the logic here should change to not just be a check for != "local"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are HDFS or other remote URIs, we still need the init-container to download them, because while they might not be needed for Spark-specific things they're still needed for the application itself. For example if my main class is in a jar stored in HDFS the jar needs to be downloaded before the JVM can launch at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore bypassing is only guaranteed to be safe if all dependencies are located on the Docker image or can afford to be downloaded after the JVM starts running (via SparkContext.addJar
).
a07409a
to
bf54fd2
Compare
@ifilonenko Addressed your comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple minor suggestions from me, then I'm good to merge.
@mccheah any last bits from you?
All, let's aim to merge in the next 24hr
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver() | ||
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs | ||
if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) { | ||
Some(InitContainerBundle(provideInitContainerConfigMapBuilder(maybeSubmittedResourceIds), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe should call .build
on provideInitContainerConfigMapBuilder(maybeSubmittedResourceIds)
since all clients immediately call .build()
on this value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave it as it is because it seems @mccheah will do a refactor after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment above is still worth doing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
val initContainerConfigMapSeq = maybeInitContainerConfigMap match { | ||
case Some(configMap) => Seq(configMap) | ||
case None => Seq() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does .toSeq
work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it works. Updated the PR.
@@ -224,7 +232,7 @@ private[spark] class Client( | |||
.watch(loggingPodStatusWatcher)) { _ => | |||
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) | |||
try { | |||
val driverOwnedResources = Seq(initContainerConfigMap) ++ | |||
val driverOwnedResources = initContainerConfigMapSeq ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline initContainerConfigMapSeq
-- no need to declare a variable used only once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the idea - I think this actually addresses point 2 on #354 where we should return either all the init-container things or none of them.
@@ -202,4 +201,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl( | |||
configMapKey, | |||
resourceStagingServerSecretPlugin) | |||
} | |||
|
|||
override def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this argument down a line and put each argument on one line.
17fdc74
to
c466ef6
Compare
…only has `local://` URIs
Thanks for the contribution @chenchun ! I'm looking forward to trying this out in my own clusters. Out of curiosity, how much less time is spent in pod startup on your cluster now that the init container is bypassed? |
It's like 0.3s per container based on aufs. |
when(initContainerComponentsProvider | ||
.provideInitContainerConfigMapBuilder(Some(SUBMITTED_RESOURCES.ids()))) | ||
.thenReturn(initContainerConfigMapBuilder) | ||
when(initContainerComponentsProvider.provideInitContainerBundle(Some(SUBMITTED_RESOURCES.ids()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified this in the merge but you should be using mockitoEq() here.
when(initContainerComponentsProvider | ||
.provideInitContainerConfigMapBuilder(None)) | ||
.thenReturn(initContainerConfigMapBuilder) | ||
when(initContainerComponentsProvider.provideInitContainerBundle(None, RESOLVED_SPARK_JARS ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Use mockitoEq()
@ifilonenko so does this require a followup commit? |
override def provideInitContainerBundle( | ||
maybeSubmittedResourceIds: Option[SubmittedResourceIds], | ||
uris: Iterable[String]): Option[InitContainerBundle] = { | ||
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line isn't used. No need to build this out. I removed this in my merge
@ash211 I refactored parts of the testing environment and accounted for pySparkFiles, in my merge in PR-351. I also took out the unnecessary line in the DriverInitComponentImpl which initialized a FileResolver that wasn't used. So this will not require a followup commit as I fixed the issues addressed above, its just for the sake of bookkeeping. |
…only has
local://
URIsFixes #338