From 33aae03b57fa18a55f348d6dc8ed5ead94b481b3 Mon Sep 17 00:00:00 2001 From: FogDong Date: Sun, 16 Apr 2023 01:03:45 +0800 Subject: [PATCH] Fix: fix sync when restarting the application Signed-off-by: FogDong --- pkg/server/domain/service/workflow.go | 82 ++++++++++++++++----------- pkg/server/event/sync/cr2ux.go | 22 +++---- 2 files changed, 61 insertions(+), 43 deletions(-) diff --git a/pkg/server/domain/service/workflow.go b/pkg/server/domain/service/workflow.go index ae670ad93..07f6cd9a1 100644 --- a/pkg/server/domain/service/workflow.go +++ b/pkg/server/domain/service/workflow.go @@ -423,33 +423,41 @@ func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context, appPrimary } for _, item := range unfinishedRecords { record := item.(*model.WorkflowRecord) + revision := &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey} + if err := w.Store.Get(ctx, revision); err != nil { + if errors.Is(err, datastore.ErrRecordNotExist) { + // If the application revision is not exist, the record do not need be synced + record.Finished = "true" + record.Status = model.RevisionStatusFail + if err := w.Store.Put(ctx, record); err != nil { + return fmt.Errorf(("failed to set the record status to terminated: %s"), err.Error()) + } + return bcode.ErrApplicationRevisionNotExist + } + return err + } // sync from application status if record.Name == recordName { - if err := w.syncRecordFromApplicationStatus(ctx, app, record, workflowContext, appPrimaryKey); err != nil { + if err := w.syncRecordFromApplicationStatus(ctx, app, record, revision, workflowContext, appPrimaryKey); err != nil { klog.Errorf("failed to sync workflow record %s from application status %s", record.Name, err.Error()) } continue } - // sync from application revision - if err := w.syncRecordFromApplicationRevision(ctx, record); err != nil { - klog.Errorf("failed to sync workflow record %s from application revision %s", record.Name, err.Error()) + if revision.RevisionCRName != "" { + // sync from application revision + if err := w.syncRecordFromApplicationRevision(ctx, record, revision); err != nil { + klog.Errorf("failed to sync workflow record %s from application revision %s", record.Name, err.Error()) + } } } return nil } -func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Context, app *v1beta1.Application, record *model.WorkflowRecord, workflowContext map[string]string, appPrimaryKey string) error { +func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Context, app *v1beta1.Application, record *model.WorkflowRecord, revision *model.ApplicationRevision, workflowContext map[string]string, appPrimaryKey string) error { if app == nil || app.Annotations == nil || app.Status.Workflow == nil { return nil } - var revision = &model.ApplicationRevision{AppPrimaryKey: appPrimaryKey, Version: record.RevisionPrimaryKey} - if err := w.Store.Get(ctx, revision); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - return bcode.ErrApplicationRevisionNotExist - } - return err - } if workflowContext != nil { record.ContextValue = workflowContext @@ -489,10 +497,14 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex for i, step := range record.Steps { if s, ok := stepStatus[step.Name]; ok { record.Steps[i].StepStatus = s + } else { + record.Steps[i].StepStatus = resetStepStatus(step.StepStatus) } for j, sub := range step.SubStepsStatus { if s, ok := stepStatus[sub.Name]; ok { record.Steps[i].SubStepsStatus[j] = s + } else { + record.Steps[i].SubStepsStatus[j] = resetStepStatus(sub) } } } @@ -513,6 +525,7 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex revision.RevisionCRName = app.Status.LatestRevision.Name } if err := w.Store.Put(ctx, revision); err != nil { + klog.ErrorS(err, "failed to update application revision status", "revision", revision.Version) return err } @@ -522,20 +535,19 @@ func (w *workflowServiceImpl) syncRecordFromApplicationStatus(ctx context.Contex return nil } -func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Context, record *model.WorkflowRecord) error { - var revision = &model.ApplicationRevision{AppPrimaryKey: record.AppPrimaryKey, Version: record.RevisionPrimaryKey} - if err := w.Store.Get(ctx, revision); err != nil { - if errors.Is(err, datastore.ErrRecordNotExist) { - // If the application revision is not exist, the record do not need be synced - record.Finished = "true" - record.Status = model.RevisionStatusFail - if err := w.Store.Put(ctx, record); err != nil { - return fmt.Errorf(("failed to set the record status to terminated: %s"), err.Error()) - } - } - return fmt.Errorf(("failed to get the application revision from database: %s"), err.Error()) - } +func resetStepStatus(status model.StepStatus) model.StepStatus { + status.Phase = "" + status.Message = "" + status.LastExecuteTime = time.Time{} + status.FirstExecuteTime = time.Time{} + return status +} + +func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Context, record *model.WorkflowRecord, revision *model.ApplicationRevision) error { var appRevision v1beta1.ApplicationRevision + if revision.RevisionCRName == "" { + return nil + } if err := w.KubeClient.Get(ctx, types.NamespacedName{Namespace: record.Namespace, Name: revision.RevisionCRName}, &appRevision); err != nil { if apierrors.IsNotFound(err) { klog.Warningf("can't find the application revision %s/%s, set the record status to terminated", revision.RevisionCRName, record.Namespace) @@ -553,19 +565,23 @@ func (w *workflowServiceImpl) syncRecordFromApplicationRevision(ctx context.Cont appRevision.Spec.Application.Status.Workflow.Finished = true appRevision.Spec.Application.Status.Workflow.Terminated = true } + phase := appRevision.Spec.Application.Status.Workflow.Phase + if phase != workflowv1alpha1.WorkflowStateFailed && phase != workflowv1alpha1.WorkflowStateSucceeded { + appRevision.Spec.Application.Status.Workflow.Phase = workflowv1alpha1.WorkflowStateTerminated + } } - return w.syncRecordFromApplicationStatus(ctx, &appRevision.Spec.Application, record, appRevision.Status.WorkflowContext, record.AppPrimaryKey) + return w.syncRecordFromApplicationStatus(ctx, &appRevision.Spec.Application, record, revision, appRevision.Status.WorkflowContext, record.AppPrimaryKey) } func generateRevisionStatus(phase workflowv1alpha1.WorkflowRunPhase) string { summaryStatus := model.RevisionStatusRunning - switch { - case phase == workflowv1alpha1.WorkflowStateFailed: - summaryStatus = model.RevisionStatusFail - case phase == workflowv1alpha1.WorkflowStateSucceeded: - summaryStatus = model.RevisionStatusComplete - case phase == workflowv1alpha1.WorkflowStateTerminated: - summaryStatus = model.RevisionStatusTerminated + switch phase { + case workflowv1alpha1.WorkflowStateFailed: + return model.RevisionStatusFail + case workflowv1alpha1.WorkflowStateSucceeded: + return model.RevisionStatusComplete + case workflowv1alpha1.WorkflowStateTerminated: + return model.RevisionStatusTerminated } return summaryStatus } diff --git a/pkg/server/event/sync/cr2ux.go b/pkg/server/event/sync/cr2ux.go index 2ac72068d..793e5d859 100644 --- a/pkg/server/event/sync/cr2ux.go +++ b/pkg/server/event/sync/cr2ux.go @@ -83,10 +83,13 @@ func (c *CR2UX) syncAppCreatedByUX(ctx context.Context, targetApp *v1beta1.Appli if appPrimaryKey == "" { return fmt.Errorf("appName is empty in application %s", targetApp.Name) } - if targetApp.Annotations == nil || targetApp.Annotations[oam.AnnotationPublishVersion] == "" { - klog.Warningf("app %s/%s has no publish version, skip sync workflow status", targetApp.Namespace, targetApp.Name) + var recordName string + if targetApp.Status.Workflow != nil { + recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1) + } else { + klog.Warningf("app %s/%s has no revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name) + return nil } - recordName := targetApp.Annotations[oam.AnnotationPublishVersion] if err := c.workflowService.SyncWorkflowRecord(ctx, appPrimaryKey, recordName, targetApp, nil); err != nil { klog.ErrorS(err, "failed to sync workflow status", "oam app name", targetApp.Name, "workflow name", oam.GetPublishVersion(targetApp), "record name", recordName) return err @@ -157,13 +160,12 @@ func (c *CR2UX) syncAppCreatedByCLI(ctx context.Context, targetApp *v1beta1.Appl klog.Infof("application %s/%s revision %s synced successful", targetApp.Name, targetApp.Namespace, syncedVersion) } - recordName := oam.GetPublishVersion(targetApp) - if recordName == "" { - if targetApp.Status.Workflow != nil { - recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1) - } else { - klog.Warningf("app %s/%s has no publish version or revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name) - } + var recordName string + if targetApp.Status.Workflow != nil { + recordName = strings.Replace(targetApp.Status.Workflow.AppRevision, ":", "-", 1) + } else { + klog.Warningf("app %s/%s has no revision in status, skip sync workflow status", targetApp.Namespace, targetApp.Name) + return nil } return c.workflowService.SyncWorkflowRecord(ctx, c.getAppMetaName(ctx, targetApp.Name, targetApp.Namespace), recordName, targetApp, nil) }