Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix retries #2241

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b0375a8
Attempt to requeue after correct period
Tom-Newton Oct 9, 2024
6c53057
Syntactically correct
Tom-Newton Oct 9, 2024
a468bba
I think correct requeueing
Tom-Newton Oct 9, 2024
127f261
Same treatment for the other retries
Tom-Newton Oct 9, 2024
6bb6c74
Tidy
Tom-Newton Oct 9, 2024
0d8e946
Requeue after deleting resources
Tom-Newton Oct 9, 2024
d2739a9
Try to fix submission status updates
Tom-Newton Oct 9, 2024
b2e1699
Tidy
Tom-Newton Oct 9, 2024
9e18dcf
Correct usage of submitSparkApplication
Tom-Newton Oct 9, 2024
b9ee75e
Fix error logging
Tom-Newton Oct 10, 2024
0f86884
Bring back ExecutionAttempts increment that I forgot about
Tom-Newton Oct 10, 2024
413bb53
Log after reconcile complete
Tom-Newton Oct 10, 2024
ff43187
Fix setting submission ID
Tom-Newton Oct 10, 2024
6b5c4bf
Tidy logging
Tom-Newton Oct 10, 2024
8a12c0c
Tidy
Tom-Newton Oct 10, 2024
af81e48
Tidy
Tom-Newton Oct 10, 2024
c683e2a
Update comment
Tom-Newton Oct 13, 2024
8041a12
Start a new test
Tom-Newton Oct 13, 2024
2f32f3b
Working Fails submission and retries until retries are exhausted test
Tom-Newton Oct 13, 2024
9df8df8
Add Application fails and retries until retries are exhausted
Tom-Newton Oct 13, 2024
9ffa5a1
Tidy
Tom-Newton Oct 13, 2024
9b5e2c8
Comments
Tom-Newton Oct 13, 2024
80f74fa
Tidy
Tom-Newton Oct 13, 2024
0f890b3
Move fail configs out of the examples directory
Tom-Newton Oct 13, 2024
2c64edb
Fix lint
Tom-Newton Oct 13, 2024
c62cd54
Move TimeUntilNextRetryDue to `pkg/util/sparkapplication.go`
Tom-Newton Oct 18, 2024
b95d38f
Update internal/controller/sparkapplication/controller.go
Tom-Newton Oct 18, 2024
ce3bb6e
Update test/e2e/sparkapplication_test.go
Tom-Newton Oct 18, 2024
223687f
camelCase
Tom-Newton Oct 18, 2024
f0dc727
make fo-fmt
Tom-Newton Oct 18, 2024
5507800
PR comments
Tom-Newton Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 52 additions & 56 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{Requeue: true}, err
}
logger.Info("Reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
defer logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace)
ChenYi015 marked this conversation as resolved.
Show resolved Hide resolved

// Check if the spark application is being deleted
if !app.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -259,17 +260,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl.
}
app := old.DeepCopy()

if err := r.submitSparkApplication(app); err != nil {
logger.Error(err, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace)
app.Status = v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.ApplicationStateFailedSubmission,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
}
_ = r.submitSparkApplication(app)
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}
Expand Down Expand Up @@ -315,6 +306,9 @@ func (r *Reconciler) reconcileSubmittedSparkApplication(ctx context.Context, req

func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName

var result ctrl.Result

retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
Expand All @@ -328,15 +322,22 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
app := old.DeepCopy()

if util.ShouldRetry(app) {
if isNextRetryDue(app) {
timeUntilNextRetryDue, err := util.TimeUntilNextRetryDue(app)
if err != nil {
return err
}
if timeUntilNextRetryDue <= 0 {
if r.validateSparkResourceDeletion(ctx, app) {
_ = r.submitSparkApplication(app)
} else {
if err := r.deleteSparkResources(ctx, app); err != nil {
logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace)
}
ChenYi015 marked this conversation as resolved.
Show resolved Hide resolved
return err
return fmt.Errorf("resources associated with SparkApplication name: %s namespace: %s, needed to be deleted", app.Name, app.Namespace)
ChenYi015 marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
// If we're waiting before retrying then reconcile will not modify anything, so we need to requeue.
result.RequeueAfter = timeUntilNextRetryDue
}
} else {
app.Status.AppState.State = v1beta2.ApplicationStateFailed
Expand All @@ -352,9 +353,9 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
)
if retryErr != nil {
logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace)
return ctrl.Result{}, retryErr
return result, retryErr
}
return ctrl.Result{}, nil
return result, nil
}

func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -408,9 +409,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context,
logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
r.recordSparkApplicationEvent(app)
r.resetSparkApplicationStatus(app)
if err = r.submitSparkApplication(app); err != nil {
logger.Error(err, "Failed to run spark-submit", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
}
_ = r.submitSparkApplication(app)
}
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
Expand Down Expand Up @@ -497,6 +496,9 @@ func (r *Reconciler) reconcileSucceedingSparkApplication(ctx context.Context, re

func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName

var result ctrl.Result

retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
Expand All @@ -510,12 +512,19 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c
app := old.DeepCopy()

if util.ShouldRetry(app) {
if isNextRetryDue(app) {
timeUntilNextRetryDue, err := util.TimeUntilNextRetryDue(app)
if err != nil {
return err
}
if timeUntilNextRetryDue <= 0 {
if err := r.deleteSparkResources(ctx, app); err != nil {
logger.Error(err, "failed to delete spark resources", "name", app.Name, "namespace", app.Namespace)
return err
}
app.Status.AppState.State = v1beta2.ApplicationStatePendingRerun
} else {
// If we're waiting before retrying then reconcile will not modify anything, so we need to requeue.
result.RequeueAfter = timeUntilNextRetryDue
}
} else {
app.Status.AppState.State = v1beta2.ApplicationStateFailed
Expand All @@ -528,9 +537,9 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c
)
if retryErr != nil {
logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace)
return ctrl.Result{}, retryErr
return result, retryErr
}
return ctrl.Result{}, nil
return result, nil
}

func (r *Reconciler) reconcileCompletedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -632,8 +641,28 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa
}

// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error {
func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) {
logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
// SubmissionID must be set before creating any resources to ensure all the resources are labeled.
app.Status.SubmissionID = uuid.New().String()
app.Status.LastSubmissionAttemptTime = metav1.Now()
app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1

defer func() {
if submitErr == nil {
app.Status.AppState = v1beta2.ApplicationState{
State: v1beta2.ApplicationStateSubmitted,
}
app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1
} else {
logger.Info("Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State, "error", submitErr)
app.Status.AppState = v1beta2.ApplicationState{
State: v1beta2.ApplicationStateFailedSubmission,
ErrorMessage: submitErr.Error(),
}
}
r.recordSparkApplicationEvent(app)
}()

if util.PrometheusMonitoringEnabled(app) {
logger.Info("Configure Prometheus monitoring for SparkApplication", "name", app.Name, "namespace", app.Namespace)
Expand Down Expand Up @@ -709,52 +738,19 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error

driverPodName := util.GetDriverPodName(app)
app.Status.DriverInfo.PodName = driverPodName
app.Status.SubmissionID = uuid.New().String()
sparkSubmitArgs, err := buildSparkSubmitArgs(app)
if err != nil {
return fmt.Errorf("failed to build spark-submit arguments: %v", err)
}

// Try submitting the application by running spark-submit.
logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs)
submitted, err := runSparkSubmit(newSubmission(sparkSubmitArgs, app))
if err != nil {
r.recordSparkApplicationEvent(app)
if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil {
return fmt.Errorf("failed to run spark-submit: %v", err)
}
if !submitted {
// The application may not have been submitted even if err == nil, e.g., when some
// state update caused an attempt to re-submit the application, in which case no
// error gets returned from runSparkSubmit. If this is the case, we simply return.
return nil
}

app.Status.AppState = v1beta2.ApplicationState{
State: v1beta2.ApplicationStateSubmitted,
}
app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1
app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1
app.Status.LastSubmissionAttemptTime = metav1.Now()
r.recordSparkApplicationEvent(app)
return nil
}

// Helper func to determine if the next retry the SparkApplication is due now.
func isNextRetryDue(app *v1beta2.SparkApplication) bool {
retryInterval := app.Spec.RestartPolicy.OnFailureRetryInterval
attemptsDone := app.Status.SubmissionAttempts
lastEventTime := app.Status.LastSubmissionAttemptTime
if retryInterval == nil || lastEventTime.IsZero() || attemptsDone <= 0 {
return false
}

// Retry if we have waited at-least equal to attempts*RetryInterval since we do a linear back-off.
interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone)
currentTime := time.Now()
logger.Info(fmt.Sprintf("currentTime is %v, interval is %v", currentTime, interval))
return currentTime.After(lastEventTime.Add(interval))
}

// updateDriverState finds the driver pod of the application
// and updates the driver state based on the current phase of the pod.
func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkApplication) error {
Expand Down
12 changes: 6 additions & 6 deletions internal/controller/sparkapplication/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func newSubmission(args []string, app *v1beta2.SparkApplication) *submission {
}
}

func runSparkSubmit(submission *submission) (bool, error) {
func runSparkSubmit(submission *submission) error {
sparkHome, present := os.LookupEnv(common.EnvSparkHome)
if !present {
return false, fmt.Errorf("env %s is not specified", common.EnvSparkHome)
return fmt.Errorf("env %s is not specified", common.EnvSparkHome)
}
command := filepath.Join(sparkHome, "bin", "spark-submit")
cmd := exec.Command(command, submission.args...)
Expand All @@ -58,14 +58,14 @@ func runSparkSubmit(submission *submission) (bool, error) {
}
// The driver pod of the application already exists.
if strings.Contains(errorMsg, common.ErrorCodePodAlreadyExists) {
return false, fmt.Errorf("driver pod already exist")
return fmt.Errorf("driver pod already exist")
}
if errorMsg != "" {
return false, fmt.Errorf("failed to run spark-submit: %s", errorMsg)
return fmt.Errorf("failed to run spark-submit: %s", errorMsg)
}
return false, fmt.Errorf("failed to run spark-submit: %v", err)
return fmt.Errorf("failed to run spark-submit: %v", err)
}
return true, nil
return nil
}

// buildSparkSubmitArgs builds the arguments for spark-submit.
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/sparkapplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,27 @@ func ShouldRetry(app *v1beta2.SparkApplication) bool {
return false
}

func TimeUntilNextRetryDue(app *v1beta2.SparkApplication) (time.Duration, error) {
var retryInterval *int64
switch app.Status.AppState.State {
case v1beta2.ApplicationStateFailedSubmission:
retryInterval = app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval
case v1beta2.ApplicationStateFailing:
retryInterval = app.Spec.RestartPolicy.OnFailureRetryInterval
}

attemptsDone := app.Status.SubmissionAttempts
lastAttemptTime := app.Status.LastSubmissionAttemptTime
if retryInterval == nil || lastAttemptTime.IsZero() || attemptsDone <= 0 {
return -1, fmt.Errorf("invalid retry interval (%v), last attempt time (%v) or attemptsDone (%v)", retryInterval, lastAttemptTime, attemptsDone)
}

// Retry wait time is attempts*RetryInterval to do a linear backoff.
interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone)
currentTime := time.Now()
return interval - currentTime.Sub(lastAttemptTime.Time), nil
}

func GetLocalVolumes(app *v1beta2.SparkApplication) map[string]corev1.Volume {
volumes := make(map[string]corev1.Volume)
for _, volume := range app.Spec.Volumes {
Expand Down
44 changes: 44 additions & 0 deletions test/e2e/bad_examples/fail-application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: fail-submission
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.2
imagePullPolicy: IfNotPresent
mainClass: non-existent
mainApplicationFile: local:///non-existent.jar
sparkVersion: 3.5.2
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 1
driver:
labels:
version: 3.5.2
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.2
instances: 1
cores: 1
memory: 512m
44 changes: 44 additions & 0 deletions test/e2e/bad_examples/fail-submission.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: fail-submission
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.2
imagePullPolicy: IfNotPresent
mainClass: dummy
mainApplicationFile: local:///dummy.jar
sparkVersion: 3.5.2
restartPolicy:
type: OnFailure
onSubmissionFailureRetries: 3
onSubmissionFailureRetryInterval: 1
driver:
labels:
version: 3.5.2
cores: 1
memory: 512m
serviceAccount: non-existent # This is the important part that causes submission to fail.
executor:
labels:
version: 3.5.2
instances: 1
cores: 1
memory: 512m
Loading