Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Merged Lightbend custom-k8s-2.4.0-rc-3
Browse files Browse the repository at this point in the history
  • Loading branch information
yuchaoran2011 committed May 7, 2019
2 parents c3e32bf + 3b32a37 commit 38938bc
Show file tree
Hide file tree
Showing 42 changed files with 1,436 additions and 72 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.status.{AppStatusSource, AppStatusStore}
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.ThreadStackTrace
import org.apache.spark.storage._
Expand Down Expand Up @@ -417,7 +418,8 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = AppStatusStore.createLiveStore(conf)
val appStatusSource = AppStatusSource.createSource(conf)
_statusStore = AppStatusStore.createLiveStore(conf, appStatusSource)
listenerBus.addToStatusQueue(_statusStore.listener.get)

// Create the Spark execution environment (cache, map output tracker, etc)
Expand Down Expand Up @@ -563,7 +565,7 @@ class SparkContext(config: SparkConf) extends Logging {
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}

appStatusSource.foreach(_env.metricsSystem.registerSource(_))
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
appStatusSource: Option[AppStatusSource] = None,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {

import config._
Expand Down Expand Up @@ -287,6 +288,11 @@ private[spark] class AppStatusListener(
liveExecutors.values.foreach { exec =>
if (exec.hostname == host) {
exec.isBlacklisted = blacklisted
if (blacklisted) {
appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc())
} else {
appStatusSource.foreach(_.UNBLACKLISTED_EXECUTORS.inc())
}
liveUpdate(exec, now)
}
}
Expand Down Expand Up @@ -378,11 +384,32 @@ private[spark] class AppStatusListener(
}

job.status = event.jobResult match {
case JobSucceeded => JobExecutionStatus.SUCCEEDED
case JobFailed(_) => JobExecutionStatus.FAILED
case JobSucceeded =>
appStatusSource.foreach{_.SUCCEEDED_JOBS.inc()}
JobExecutionStatus.SUCCEEDED
case JobFailed(_) =>
appStatusSource.foreach{_.FAILED_JOBS.inc()}
JobExecutionStatus.FAILED
}

job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
for {
source <- appStatusSource
submissionTime <- job.submissionTime
completionTime <- job.completionTime
} {
source.JOB_DURATION.value.set(completionTime.getTime() - submissionTime.getTime())
}
// update global app status counters
appStatusSource.foreach { source =>
source.COMPLETED_STAGES.inc(job.completedStages.size)
source.FAILED_STAGES.inc(job.failedStages)
source.COMPLETED_TASKS.inc(job.completedTasks)
source.FAILED_TASKS.inc(job.failedTasks)
source.KILLED_TASKS.inc(job.killedTasks)
source.SKIPPED_TASKS.inc(job.skippedTasks)
source.SKIPPED_STAGES.inc(job.skippedStages.size)
}
update(job, now, last = true)
if (job.status == JobExecutionStatus.SUCCEEDED) {
appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
Expand Down
85 changes: 85 additions & 0 deletions core/src/main/scala/org/apache/spark/status/AppStatusSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status

import java.util.concurrent.atomic.AtomicLong

import AppStatusSource.getCounter
import com.codahale.metrics.{Counter, Gauge, MetricRegistry}

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.metrics.source.Source

private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] {
override def getValue: Long = value.get()
}

private[spark] class AppStatusSource extends Source {

override implicit val metricRegistry = new MetricRegistry()

override val sourceName = "appStatus"

val jobDuration = new JobDuration(new AtomicLong(0L))

// Duration of each job in milliseconds
val JOB_DURATION = metricRegistry
.register(MetricRegistry.name("jobDuration"), jobDuration)

val FAILED_STAGES = getCounter("stages", "failedStages")

val SKIPPED_STAGES = getCounter("stages", "skippedStages")

val COMPLETED_STAGES = getCounter("stages", "completedStages")

val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs")

val FAILED_JOBS = getCounter("jobs", "failedJobs")

val COMPLETED_TASKS = getCounter("tasks", "completedTasks")

val FAILED_TASKS = getCounter("tasks", "failedTasks")

val KILLED_TASKS = getCounter("tasks", "killedTasks")

val SKIPPED_TASKS = getCounter("tasks", "skippedTasks")

val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors")

val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors")
}

private[spark] object AppStatusSource {

def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = {
metricRegistry.counter (MetricRegistry.name (prefix, name) )
}

def createSource(conf: SparkConf): Option[AppStatusSource] = {
Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED))
.filter(identity)
.map {_ => new AppStatusSource()}
}

val APP_STATUS_METRICS_ENABLED =
ConfigBuilder("spark.app.status.metrics.enabled")
.doc("Whether Dropwizard/Codahale metrics " +
"will be reported for the status of the running spark app.")
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,11 @@ private[spark] object AppStatusStore {
/**
* Create an in-memory store for a live application.
*/
def createLiveStore(conf: SparkConf): AppStatusStore = {
def createLiveStore(
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
new AppStatusStore(store, listener = Some(listener))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] abstract class LiveEntity {
private class LiveJob(
val jobId: Int,
name: String,
submissionTime: Option[Date],
val submissionTime: Option[Date],
val stageIds: Seq[Int],
jobGroup: Option[String],
numTasks: Int) extends LiveEntity {
Expand Down
7 changes: 7 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,13 @@ specific to Spark on Kubernetes.
false, the launcher has a "fire-and-forget" behavior when launching the Spark job.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.deleteOnTermination</code></td>
<td>true</td>
<td>
Specify whether executor pods should be deleted in case of failure or normal termination.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.report.interval</code></td>
<td><code>1s</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ private[spark] object Config extends Logging {
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

val KUBERNETES_DELETE_EXECUTORS =
ConfigBuilder("spark.kubernetes.executor.deleteOnTermination")
.doc("If set to false then executor pods will not be deleted in case " +
"of failure or normal termination.")
.booleanConf
.createWithDefault(true)

val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private[spark] class ExecutorPodsAllocator(
private val kubernetesDriverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)

private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)

private val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.withName(name)
Expand Down Expand Up @@ -86,13 +88,15 @@ private[spark] class ExecutorPodsAllocator(
s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
" previous allocation attempt tried to create it. The executor may have been" +
" deleted but the application missed the deletion event.")
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
.delete()
if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
.delete()
}
}
newlyCreatedExecutors -= execId
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.util.Utils

private[spark] class ExecutorPodsLifecycleManager(
conf: SparkConf,
val conf: SparkConf,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
Expand All @@ -43,6 +43,8 @@ private[spark] class ExecutorPodsLifecycleManager(

private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)

private lazy val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)

def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
snapshotsStore.addSubscriber(eventProcessingInterval) {
onNewSnapshots(schedulerBackend, _)
Expand Down Expand Up @@ -111,8 +113,10 @@ private[spark] class ExecutorPodsLifecycleManager(
execId: Long,
schedulerBackend: KubernetesClusterSchedulerBackend,
execIdsRemovedInRound: mutable.Set[Long]): Unit = {
removeExecutorFromK8s(podState.pod)
removeExecutorFromSpark(schedulerBackend, podState, execId)
if (shouldDeleteExecutors) {
removeExecutorFromK8s(podState.pod)
}
execIdsRemovedInRound += execId
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private[spark] class KubernetesClusterSchedulerBackend(

private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)

private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)

// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
removeExecutor(executorId, reason)
Expand Down Expand Up @@ -95,14 +97,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
pollEvents.stop()
}

Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
}
}

Utils.tryLogNonFatalError {
ThreadUtils.shutdown(requestExecutorsService)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.Mockito.{mock, never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
Expand Down Expand Up @@ -103,6 +104,17 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
}

test("Keep executor pods in k8s if configured.") {
val failedPod = failedExecutorWithoutDeletion(1)
eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false)
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
verify(podOperations, never()).delete()
}

private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = {
s"""
|The executor with id $failedExecutorId exited with exit code 1.
Expand Down
Loading

0 comments on commit 38938bc

Please sign in to comment.