Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Bypass init-containers if spark.jars and spark.files is empty or … #348

Merged

Conversation

chenchun
Copy link

…only has local:// URIs

Fixes #338

@chenchun
Copy link
Author

@mccheah Would you like to take a look ?

@ash211
Copy link

ash211 commented Jun 16, 2017

Thanks for the contribution @chenchun ! Definitely appreciate seeing more and more people using this project!

For this PR, at first glance it look like the scalastyle check is failing because one of the lines is too long, see http://spark-k8s-jenkins.pepperdata.org:8080/job/PR-spark-k8s-full-build/569/consoleFull#1143296497853a0453-9a85-4740-a867-694552c49a93 Would you mind please updating the PR to pass scalastyle? You can run it locally with ./dev/scalastyle

}
}
var podBuilder = basePod
var sparkConfWithExecutorInit = sparkConf
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer using val over var where possible

if (e.nonEmpty && !e.startsWith("local://")) {
needInitContainer = true
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do something like

val needInitContainer = (resolvedSparkJars ++ resolvedSparkFiles).exists { e ->
  e.nonEmpty && !e.startsWith("local://")
}

needInitContainer = true
}
}
var podBuilder = basePod
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write the logic such that this can be kept as a val. There's a few options, but here's one:

val (sparkConfWithExecutorInit, podBuilderWithInitContainer) = if (needInitContainer { ... } else (sparkConf, basePod)

val credentialsSecret = credentialsMounter.createCredentialsSecret()
val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials(
podWithInitContainer, driverContainer.getName, credentialsSecret)
podBuilder = credentialsMounter.mountDriverKubernetesCredentials(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, use val everywhere and don't re-assign here.

@mccheah
Copy link

mccheah commented Jun 16, 2017

It might be preferable to have the switch on if init-containers are needed or not to be handled inside either one of the existing modules or in a new module. See DriverInitContainerComponentsProvider and the modules it creates for some examples. We're trying to avoid the problem we had in the first implementation where Client.scala grew to be untenably large because of all the logic that was incrementally added over time.

@@ -168,24 +168,34 @@ private[spark] class Client(
val initContainerConfigMap = initContainerComponentsProvider
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we shouldn't be making a config map if we're not using an init-container. In general we should try to encapsulate everything related to whether or not we need an init-container in one place, so that we only have to make the check once and run everything that relates to that check there.

@chenchun
Copy link
Author

Thanks for the review. Haven't got chance to test it, but if this is the right direction? @mccheah


override def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
new ExecutorInitContainerConfigurationImpl(
if (needInitContainer) new ExecutorInitContainerConfigurationImpl(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the repeated instances of checking for this variable is indicating we need to do some refactoring here. I already had a hunch that we had done too much sharding and fragmentation in this architecture, and this repeated logic seems to confirm that theory. In v1 we had the extreme that had a particularly monolithic class hierarchy that was difficult to read because we had everything in one place, but in this code we appear to have went to the opposite extreme by trying to make every trait do exactly one thing, but that might have been too granular and results in problems like this.

I wonder if you could come up with a design that makes it such that we only have to switch once. One example of this could be to have a single method in this provider class that creates an init container "bundle", which can be a case class / struct of all the things that's required to set up one of these init containers. Then, we can have a switch that provides an Option of one of these bundles, and either returns all of the components, or none of them.

We don't actually have to merge the classes themselves, though in some cases that might make sense. We particularly need to isolate the SparkPodInitContainerBootstrap because it's a component that's re-used for both the driver pod and the executor pods. But there might be other cases where it makes more sense to combine some of the classes.

I'm fairly open to any ideas on how to make this code better, but can you see what can be done here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ifilonenko who has also been working on this code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Difficulty of this statement is that for PySpark and SparkR there will be files that arent necessarily SparkJars or SparkFiles... so this logic wouldn't work specifically given PR 351 As @mccheah has noted, this may require re-structuring. Good to bring up at next SIG meeting

val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
private val needInitContainer = (resolvedSparkJars ++ resolvedSparkFiles).exists { e =>
e.nonEmpty && !e.startsWith("local://")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to refrain from this check by leveraging the functions already provided by: here
or if use-case is specific enough try adding the function to the Utils for re-usability within the system

@chenchun
Copy link
Author

Thanks for your comments. I think I started to get into Scala. @mccheah @ifilonenko PTAL.

@@ -105,7 +105,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)

override def provideInitContainerConfigMapBuilder(
private def provideInitContainerConfigMapBuilder(
Copy link
Member

@ifilonenko ifilonenko Jun 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is causing Compile errors in Unit Test (as seen here in ClientV2Suite) please modify ClientV2Suite to account for these changes

val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
if (KubernetesFileUtils.getNonContainerLocalFiles(
containerLocalizedFilesResolver.resolveSubmittedSparkJars()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good use of KubernetesFileUtils however we are already calling this in Client.scala. Maybe pass those jars down so we don't have to re-run this same command in multiple places

maybeSubmittedResourceIds: Option[SubmittedResourceIds]): Option[InitContainerBundle] = {
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
if (KubernetesFileUtils.getNonContainerLocalFiles(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why local://? What if it includes: hdfs://, s3? or whatnot? @mccheah is intention here to bypass if only local:// or if all non-file:// ? In which case the logic here should change to not just be a check for != "local"

Copy link

@mccheah mccheah Jun 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are HDFS or other remote URIs, we still need the init-container to download them, because while they might not be needed for Spark-specific things they're still needed for the application itself. For example if my main class is in a jar stored in HDFS the jar needs to be downloaded before the JVM can launch at all.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore bypassing is only guaranteed to be safe if all dependencies are located on the Docker image or can afford to be downloaded after the JVM starts running (via SparkContext.addJar).

@chenchun chenchun force-pushed the pod_launch branch 2 times, most recently from a07409a to bf54fd2 Compare June 22, 2017 03:46
@chenchun
Copy link
Author

@ifilonenko Addressed your comments

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple minor suggestions from me, then I'm good to merge.

@mccheah any last bits from you?

All, let's aim to merge in the next 24hr

val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
Some(InitContainerBundle(provideInitContainerConfigMapBuilder(maybeSubmittedResourceIds),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe should call .build on provideInitContainerConfigMapBuilder(maybeSubmittedResourceIds) since all clients immediately call .build() on this value

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave it as it is because it seems @mccheah will do a refactor after.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment above is still worth doing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

val initContainerConfigMapSeq = maybeInitContainerConfigMap match {
case Some(configMap) => Seq(configMap)
case None => Seq()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does .toSeq work here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works. Updated the PR.

@@ -224,7 +232,7 @@ private[spark] class Client(
.watch(loggingPodStatusWatcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val driverOwnedResources = Seq(initContainerConfigMap) ++
val driverOwnedResources = initContainerConfigMapSeq ++
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline initContainerConfigMapSeq -- no need to declare a variable used only once

Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the idea - I think this actually addresses point 2 on #354 where we should return either all the init-container things or none of them.

@@ -202,4 +201,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
configMapKey,
resourceStagingServerSecretPlugin)
}

override def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this argument down a line and put each argument on one line.

@chenchun chenchun force-pushed the pod_launch branch 2 times, most recently from 17fdc74 to c466ef6 Compare June 23, 2017 02:00
@ash211 ash211 merged commit 08fe944 into apache-spark-on-k8s:branch-2.1-kubernetes Jun 23, 2017
@ash211
Copy link

ash211 commented Jun 23, 2017

Thanks for the contribution @chenchun ! I'm looking forward to trying this out in my own clusters.

Out of curiosity, how much less time is spent in pod startup on your cluster now that the init container is bypassed?

@chenchun chenchun deleted the pod_launch branch June 23, 2017 06:24
@chenchun
Copy link
Author

It's like 0.3s per container based on aufs.

when(initContainerComponentsProvider
.provideInitContainerConfigMapBuilder(Some(SUBMITTED_RESOURCES.ids())))
.thenReturn(initContainerConfigMapBuilder)
when(initContainerComponentsProvider.provideInitContainerBundle(Some(SUBMITTED_RESOURCES.ids()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified this in the merge but you should be using mockitoEq() here.

when(initContainerComponentsProvider
.provideInitContainerConfigMapBuilder(None))
.thenReturn(initContainerConfigMapBuilder)
when(initContainerComponentsProvider.provideInitContainerBundle(None, RESOLVED_SPARK_JARS ++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Use mockitoEq()

@ash211
Copy link

ash211 commented Jun 23, 2017

@ifilonenko so does this require a followup commit?

override def provideInitContainerBundle(
maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle] = {
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line isn't used. No need to build this out. I removed this in my merge

@ifilonenko
Copy link
Member

@ash211 I refactored parts of the testing environment and accounted for pySparkFiles, in my merge in PR-351. I also took out the unnecessary line in the DriverInitComponentImpl which initialized a FileResolver that wasn't used. So this will not require a followup commit as I fixed the issues addressed above, its just for the sake of bookkeeping.

foxish pushed a commit that referenced this pull request Jul 24, 2017
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants