Skip to content

Commit

Permalink
Robustness to driver pod taking time to create (#2315)
Browse files Browse the repository at this point in the history
* Retry after driver pod now found if recent submission

Signed-off-by: Thomas Newton <[email protected]>

* Add a test

Signed-off-by: Thomas Newton <[email protected]>

* Make grace period configurable

Signed-off-by: Thomas Newton <[email protected]>

* Update test

Signed-off-by: Thomas Newton <[email protected]>

* Add an extra test with the driver pod

Signed-off-by: Thomas Newton <[email protected]>

* Separate context to create and delete the driver pod

Signed-off-by: Thomas Newton <[email protected]>

* Tidy

Signed-off-by: Thomas Newton <[email protected]>

* Autoformat

Signed-off-by: Thomas Newton <[email protected]>

* Update error message

Signed-off-by: Thomas Newton <[email protected]>

* Add helm paramater

Signed-off-by: Thomas Newton <[email protected]>

* Update internal/controller/sparkapplication/controller.go

Co-authored-by: Yi Chen <[email protected]>
Signed-off-by: Thomas Newton <[email protected]>

* Newlines between helm tests

Signed-off-by: Thomas Newton <[email protected]>

---------

Signed-off-by: Thomas Newton <[email protected]>
Co-authored-by: Yi Chen <[email protected]>
  • Loading branch information
Tom-Newton and ChenYi015 authored Dec 4, 2024
1 parent a261523 commit d815e78
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 12 deletions.
1 change: 1 addition & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| controller.replicas | int | `1` | Number of replicas of controller. |
| controller.workers | int | `10` | Reconcile concurrency, higher values might increase memory usage. |
| controller.logLevel | string | `"info"` | Configure the verbosity of logging, can be one of `debug`, `info`, `error`. |
| controller.driverPodCreationGracePeriod | string | `"10s"` | Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created. |
| controller.maxTrackedExecutorPerApp | int | `1000` | Specifies the maximum number of Executor pods that can be tracked by the controller per SparkApplication. |
| controller.uiService.enable | bool | `true` | Specifies whether to create service for Spark web UI. |
| controller.uiIngress.enable | bool | `false` | Specifies whether to create ingress for Spark web UI. `controller.uiService.enable` must be `true` to enable ingress. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ spec:
{{- if .Values.controller.workqueueRateLimiter.maxDelay.enable }}
- --workqueue-ratelimiter-max-delay={{ .Values.controller.workqueueRateLimiter.maxDelay.duration }}
{{- end }}
{{- if .Values.controller.driverPodCreationGracePeriod }}
- --driver-pod-creation-grace-period={{ .Values.controller.driverPodCreationGracePeriod }}
{{- end }}
{{- if .Values.controller.maxTrackedExecutorPerApp }}
- --max-tracked-executor-per-app={{ .Values.controller.maxTrackedExecutorPerApp }}
{{- end }}
Expand Down
10 changes: 10 additions & 0 deletions charts/spark-operator-chart/tests/controller/deployment_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,16 @@ tests:
- notContains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --workqueue-ratelimiter-max-delay=1h

- it: Should contain `driver-pod-creation-grace-period` arg if `controller.driverPodCreationGracePeriod` is set
set:
controller:
driverPodCreationGracePeriod: 30s
asserts:
- contains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --driver-pod-creation-grace-period=30s

- it: Should contain `--max-tracked-executor-per-app` arg if `controller.maxTrackedExecutorPerApp` is set
set:
controller:
Expand Down
3 changes: 3 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ controller:
# -- Configure the verbosity of logging, can be one of `debug`, `info`, `error`.
logLevel: info

# -- Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created.
driverPodCreationGracePeriod: 10s

# -- Specifies the maximum number of Executor pods that can be tracked by the controller per SparkApplication.
maxTrackedExecutorPerApp: 1000

Expand Down
21 changes: 13 additions & 8 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ var (
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration

driverPodCreationGracePeriod time.Duration

// Metrics
enableMetrics bool
metricsBindAddress string
Expand Down Expand Up @@ -163,6 +165,8 @@ func NewStartCommand() *cobra.Command {
command.Flags().DurationVar(&leaderElectionRenewDeadline, "leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
command.Flags().DurationVar(&leaderElectionRetryPeriod, "leader-election-retry-period", 4*time.Second, "Leader election retry period.")

command.Flags().DurationVar(&driverPodCreationGracePeriod, "driver-pod-creation-grace-period", 10*time.Second, "Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created.")

command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Enable metrics.")
command.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", "0", "The address the metric endpoint binds to. "+
"Use the port :8080. If not set, it will be 0 in order to disable the metrics server")
Expand Down Expand Up @@ -394,14 +398,15 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
sparkExecutorMetrics.Register()
}
options := sparkapplication.Options{
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp,
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
DriverPodCreationGracePeriod: driverPodCreationGracePeriod,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
Expand Down
13 changes: 9 additions & 4 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Options struct {
IngressURLFormat string
DefaultBatchScheduler string

DriverPodCreationGracePeriod time.Duration

KubeSchedulerNames []string

SparkApplicationMetrics *metrics.SparkApplicationMetrics
Expand Down Expand Up @@ -773,10 +775,13 @@ func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkAppl
}

if driverPod == nil {
app.Status.AppState.State = v1beta2.ApplicationStateFailing
app.Status.AppState.ErrorMessage = "driver pod not found"
app.Status.TerminationTime = metav1.Now()
return nil
if app.Status.AppState.State != v1beta2.ApplicationStateSubmitted || metav1.Now().Sub(app.Status.LastSubmissionAttemptTime.Time) > r.options.DriverPodCreationGracePeriod {
app.Status.AppState.State = v1beta2.ApplicationStateFailing
app.Status.AppState.ErrorMessage = "driver pod not found"
app.Status.TerminationTime = metav1.Now()
return nil
}
return fmt.Errorf("driver pod not found, while inside the grace period. Grace period of %v expires at %v", r.options.DriverPodCreationGracePeriod, app.Status.LastSubmissionAttemptTime.Add(r.options.DriverPodCreationGracePeriod))
}

app.Status.SparkApplicationID = util.GetSparkApplicationID(driverPod)
Expand Down
143 changes: 143 additions & 0 deletions internal/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,149 @@ var _ = Describe("SparkApplication Controller", func() {
})
})

Context("When reconciling a submitted SparkApplication with no driver pod", func() {
ctx := context.Background()
appName := "test"
appNamespace := "default"
key := types.NamespacedName{
Name: appName,
Namespace: appNamespace,
}

BeforeEach(func() {
By("Creating a test SparkApplication")
app := &v1beta2.SparkApplication{}
if err := k8sClient.Get(ctx, key, app); err != nil && errors.IsNotFound(err) {
app = &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: appNamespace,
},
Spec: v1beta2.SparkApplicationSpec{
MainApplicationFile: util.StringPtr("local:///dummy.jar"),
},
}
v1beta2.SetSparkApplicationDefaults(app)
Expect(k8sClient.Create(ctx, app)).To(Succeed())

app.Status.AppState.State = v1beta2.ApplicationStateSubmitted
app.Status.DriverInfo.PodName = "non-existent-driver"
app.Status.LastSubmissionAttemptTime = metav1.NewTime(time.Now())
Expect(k8sClient.Status().Update(ctx, app)).To(Succeed())
}
})

AfterEach(func() {
app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())

By("Deleting the created test SparkApplication")
Expect(k8sClient.Delete(ctx, app)).To(Succeed())
})

It("Should requeue submitted SparkApplication when driver pod not found inside the grace period", func() {
By("Reconciling the created test SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 10 * time.Second},
)
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).To(MatchError(ContainSubstring("driver pod not found, while inside the grace period. Grace period of")))
app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())
Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateSubmitted))
})

It("Should fail a SparkApplication when driver pod not found outside the grace period", func() {
By("Reconciling the created test SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 0 * time.Second},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())
Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailing))
})
})

Context("When reconciling a SparkApplication with driver pod", func() {
ctx := context.Background()
appName := "test"
appNamespace := "default"
key := types.NamespacedName{
Name: appName,
Namespace: appNamespace,
}

BeforeEach(func() {
By("Creating a test SparkApplication")
app := &v1beta2.SparkApplication{}
if err := k8sClient.Get(ctx, key, app); err != nil && errors.IsNotFound(err) {
app = &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: appNamespace,
},
Spec: v1beta2.SparkApplicationSpec{
MainApplicationFile: util.StringPtr("local:///dummy.jar"),
},
}
v1beta2.SetSparkApplicationDefaults(app)
Expect(k8sClient.Create(ctx, app)).To(Succeed())

app.Status.AppState.State = v1beta2.ApplicationStateSubmitted
driverPod := createDriverPod(appName, appNamespace)
Expect(k8sClient.Create(ctx, driverPod)).To(Succeed())
app.Status.DriverInfo.PodName = driverPod.Name
Expect(k8sClient.Status().Update(ctx, app)).To(Succeed())
}
})

AfterEach(func() {
app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())

By("Deleting the created test SparkApplication")
Expect(k8sClient.Delete(ctx, app)).To(Succeed())

By("Deleting the driver pod")
driverPod := &corev1.Pod{}
Expect(k8sClient.Get(ctx, getDriverNamespacedName(appName, appNamespace), driverPod)).To(Succeed())
Expect(k8sClient.Delete(ctx, driverPod)).To(Succeed())
})

It("When reconciling a submitted SparkApplication when driver pod exists", func() {
By("Reconciling the created test SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 0 * time.Second},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())
Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateSubmitted))
})
})

Context("When reconciling a completed SparkApplication", func() {
ctx := context.Background()
appName := "test"
Expand Down

0 comments on commit d815e78

Please sign in to comment.