From 2549d23722c515b00832a3bc3b1fa07a44f1408b Mon Sep 17 00:00:00 2001 From: TzZtzt Date: Thu, 10 Aug 2023 09:55:33 +0800 Subject: [PATCH] feature: support dataflow (#3384) * Move OperationType to api package Signed-off-by: trafalgarzzz * Init data operation API Signed-off-by: trafalgarzzz * Implement framework for DataFlow reconciler Signed-off-by: trafalgarzzz * Implement dataflow (DataLoad kind) Signed-off-by: trafalgarzzz * Implement dataflow Signed-off-by: trafalgarzzz * Generate openapi Signed-off-by: trafalgarzzz --------- Signed-off-by: trafalgarzzz --- api/v1alpha1/common.go | 29 +++ api/v1alpha1/databackup_types.go | 3 + api/v1alpha1/dataload_types.go | 4 + api/v1alpha1/datamigrate_types.go | 4 + api/v1alpha1/dataprocess_types.go | 4 + api/v1alpha1/openapi_generated.go | 99 +++++++++- api/v1alpha1/status.go | 2 + api/v1alpha1/zz_generated.deepcopy.go | 56 ++++++ .../fluid/crds/data.fluid.io_databackups.yaml | 31 ++++ .../fluid/crds/data.fluid.io_dataloads.yaml | 31 ++++ .../crds/data.fluid.io_datamigrates.yaml | 31 ++++ .../crds/data.fluid.io_dataprocesses.yaml | 31 ++++ cmd/dataset/app/dataset.go | 10 + .../crd/bases/data.fluid.io_databackups.yaml | 31 ++++ config/crd/bases/data.fluid.io_dataloads.yaml | 31 ++++ .../crd/bases/data.fluid.io_datamigrates.yaml | 31 ++++ .../bases/data.fluid.io_dataprocesses.yaml | 31 ++++ pkg/common/constants.go | 7 + .../v1alpha1/databackup/implement.go | 28 +-- .../v1alpha1/dataflow/dataflow_controller.go | 140 ++++++++++++++ .../v1alpha1/dataflow/operations.go | 175 ++++++++++++++++++ .../v1alpha1/dataload/implement.go | 4 +- .../v1alpha1/datamigrate/implement.go | 4 +- .../v1alpha1/dataprocess/implement.go | 4 +- pkg/dataoperation/context.go | 16 +- pkg/dataoperation/interface.go | 2 +- pkg/ddc/alluxio/operate.go | 7 +- pkg/ddc/base/operation.go | 19 +- pkg/ddc/base/operation_helm.go | 4 +- pkg/ddc/efc/operate.go | 3 +- pkg/ddc/goosefs/operate.go | 5 +- pkg/ddc/jindo/operate.go | 3 +- pkg/ddc/jindocache/operate.go | 3 +- pkg/ddc/jindofsx/operate.go | 5 +- pkg/ddc/juicefs/operate.go | 7 +- pkg/ddc/thin/operate.go | 3 +- pkg/utils/dataoperation.go | 73 ++++++++ 37 files changed, 920 insertions(+), 51 deletions(-) create mode 100644 pkg/controllers/v1alpha1/dataflow/dataflow_controller.go create mode 100644 pkg/controllers/v1alpha1/dataflow/operations.go diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index f110c70c815..358caaf5abc 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -233,3 +233,32 @@ type Condition struct { // LastTransitionTime describes last time the condition transitioned from one status to another. LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` } + +type OperationType string + +const ( + DataLoadType OperationType = "DataLoad" + DataBackupType OperationType = "DataBackup" + DataMigrateType OperationType = "DataMigrate" + DataProcessType OperationType = "DataProcess" +) + +type OperationRef struct { + // OperationKind specifies the type of the data operation + // +required + // +kubebuilder:validation:Enum=DataLoad;DataBackup;DataMigrate;DataProcess + OperationKind OperationType `json:"operationKind"` + + // Name specifies the name of the referred data operation + // +required + Name string `json:"name"` + + // Namespace specifies the namespace of the referred data operation + // +required + Namespace string `json:"namespace"` +} + +type WaitingStatus struct { + // OperationComplete indicates if the preceding operation is complete + OperationComplete *bool `json:"operationComplete,omitempty"` +} diff --git a/api/v1alpha1/databackup_types.go b/api/v1alpha1/databackup_types.go index 261759bd057..ec279147713 100644 --- a/api/v1alpha1/databackup_types.go +++ b/api/v1alpha1/databackup_types.go @@ -30,6 +30,9 @@ type DataBackupSpec struct { BackupPath string `json:"backupPath,omitempty"` // Manage the user to run Alluxio DataBackup RunAs *User `json:"runAs,omitempty"` + // Specifies that the preceding operation in a workflow + // +optional + RunAfter *OperationRef `json:"runAfter,omitempty"` } // +kubebuilder:printcolumn:name="Dataset",type="string",JSONPath=`.spec.dataset` diff --git a/api/v1alpha1/dataload_types.go b/api/v1alpha1/dataload_types.go index bbc077d023d..4ad5cc49dd7 100644 --- a/api/v1alpha1/dataload_types.go +++ b/api/v1alpha1/dataload_types.go @@ -85,6 +85,10 @@ type DataLoadSpec struct { // The schedule in Cron format, only set when policy is cron, see https://en.wikipedia.org/wiki/Cron. // +optional Schedule string `json:"schedule,omitempty"` + + // Specifies that the preceding operation in a workflow + // +optional + RunAfter *OperationRef `json:"runAfter,omitempty"` } // +kubebuilder:printcolumn:name="Dataset",type="string",JSONPath=`.spec.dataset.name` diff --git a/api/v1alpha1/datamigrate_types.go b/api/v1alpha1/datamigrate_types.go index 31e49b34ef7..1f8ec13d9b1 100644 --- a/api/v1alpha1/datamigrate_types.go +++ b/api/v1alpha1/datamigrate_types.go @@ -76,6 +76,10 @@ type DataMigrateSpec struct { // +optional // SchedulerName sets the scheduler to be used for DataLoad pod SchedulerName string `json:"schedulerName,omitempty"` + + // Specifies that the preceding operation in a workflow + // +optional + RunAfter *OperationRef `json:"runAfter,omitempty"` } type DataToMigrate struct { diff --git a/api/v1alpha1/dataprocess_types.go b/api/v1alpha1/dataprocess_types.go index 3fa85f03cfb..b000a302fc3 100644 --- a/api/v1alpha1/dataprocess_types.go +++ b/api/v1alpha1/dataprocess_types.go @@ -94,6 +94,10 @@ type DataProcessSpec struct { // Processor specify how to process data. // +required Processor Processor `json:"processor"` + + // Specifies that the preceding operation in a workflow + // +optional + RunAfter *OperationRef `json:"runAfter,omitempty"` } // +kubebuilder:printcolumn:name="Dataset",type="string",JSONPath=`.spec.dataset.name` diff --git a/api/v1alpha1/openapi_generated.go b/api/v1alpha1/openapi_generated.go index 40063c23efc..e138b654586 100644 --- a/api/v1alpha1/openapi_generated.go +++ b/api/v1alpha1/openapi_generated.go @@ -90,6 +90,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/fluid-cloudnative/fluid/api/v1alpha1.MetadataSyncPolicy": schema_fluid_cloudnative_fluid_api_v1alpha1_MetadataSyncPolicy(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.Mount": schema_fluid_cloudnative_fluid_api_v1alpha1_Mount(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.OSAdvise": schema_fluid_cloudnative_fluid_api_v1alpha1_OSAdvise(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef": schema_fluid_cloudnative_fluid_api_v1alpha1_OperationRef(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_OperationStatus(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata": schema_fluid_cloudnative_fluid_api_v1alpha1_PodMetadata(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.Processor": schema_fluid_cloudnative_fluid_api_v1alpha1_Processor(ref), @@ -115,6 +116,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/fluid-cloudnative/fluid/api/v1alpha1.User": schema_fluid_cloudnative_fluid_api_v1alpha1_User(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.VersionSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_VersionSpec(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.VolumeSource": schema_fluid_cloudnative_fluid_api_v1alpha1_VolumeSource(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.WaitingStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_WaitingStatus(ref), } } @@ -971,11 +973,17 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DataBackupSpec(ref common.Refer Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.User"), }, }, + "runAfter": { + SchemaProps: spec.SchemaProps{ + Description: "Specifies that the preceding operation in a workflow", + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef"), + }, + }, }, }, }, Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.User"}, + "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef", "github.com/fluid-cloudnative/fluid/api/v1alpha1.User"}, } } @@ -1190,11 +1198,17 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DataLoadSpec(ref common.Referen Format: "", }, }, + "runAfter": { + SchemaProps: spec.SchemaProps{ + Description: "Specifies that the preceding operation in a workflow", + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef"), + }, + }, }, }, }, Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata", "github.com/fluid-cloudnative/fluid/api/v1alpha1.TargetDataset", "github.com/fluid-cloudnative/fluid/api/v1alpha1.TargetPath", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.Toleration"}, + "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef", "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata", "github.com/fluid-cloudnative/fluid/api/v1alpha1.TargetDataset", "github.com/fluid-cloudnative/fluid/api/v1alpha1.TargetPath", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.Toleration"}, } } @@ -1430,12 +1444,18 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DataMigrateSpec(ref common.Refe Format: "", }, }, + "runAfter": { + SchemaProps: spec.SchemaProps{ + Description: "Specifies that the preceding operation in a workflow", + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef"), + }, + }, }, Required: []string{"from", "to"}, }, }, Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.DataToMigrate", "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.Toleration"}, + "github.com/fluid-cloudnative/fluid/api/v1alpha1.DataToMigrate", "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef", "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.Toleration"}, } } @@ -1556,12 +1576,18 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DataProcessSpec(ref common.Refe Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.Processor"), }, }, + "runAfter": { + SchemaProps: spec.SchemaProps{ + Description: "Specifies that the preceding operation in a workflow", + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef"), + }, + }, }, Required: []string{"dataset", "processor"}, }, }, Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.Processor", "github.com/fluid-cloudnative/fluid/api/v1alpha1.TargetDatasetWithMountPath"}, + "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef", "github.com/fluid-cloudnative/fluid/api/v1alpha1.Processor", "github.com/fluid-cloudnative/fluid/api/v1alpha1.TargetDatasetWithMountPath"}, } } @@ -4478,6 +4504,43 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_OSAdvise(ref common.ReferenceCa } } +func schema_fluid_cloudnative_fluid_api_v1alpha1_OperationRef(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "operationKind": { + SchemaProps: spec.SchemaProps{ + Description: "OperationKind specifies the type of the data operation", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "name": { + SchemaProps: spec.SchemaProps{ + Description: "Name specifies the name of the referred data operation", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "namespace": { + SchemaProps: spec.SchemaProps{ + Description: "Namespace specifies the namespace of the referred data operation", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"operationKind", "name", "namespace"}, + }, + }, + } +} + func schema_fluid_cloudnative_fluid_api_v1alpha1_OperationStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -4543,12 +4606,19 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_OperationStatus(ref common.Refe Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), }, }, + "waitingFor": { + SchemaProps: spec.SchemaProps{ + Description: "WaitingStatus stores information about waiting operation.", + Default: map[string]interface{}{}, + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.WaitingStatus"), + }, + }, }, Required: []string{"phase", "duration", "conditions"}, }, }, Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.Condition", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, + "github.com/fluid-cloudnative/fluid/api/v1alpha1.Condition", "github.com/fluid-cloudnative/fluid/api/v1alpha1.WaitingStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, } } @@ -6195,3 +6265,22 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_VolumeSource(ref common.Referen "k8s.io/api/core/v1.AWSElasticBlockStoreVolumeSource", "k8s.io/api/core/v1.AzureDiskVolumeSource", "k8s.io/api/core/v1.AzureFileVolumeSource", "k8s.io/api/core/v1.CSIVolumeSource", "k8s.io/api/core/v1.CephFSVolumeSource", "k8s.io/api/core/v1.CinderVolumeSource", "k8s.io/api/core/v1.ConfigMapVolumeSource", "k8s.io/api/core/v1.DownwardAPIVolumeSource", "k8s.io/api/core/v1.EmptyDirVolumeSource", "k8s.io/api/core/v1.EphemeralVolumeSource", "k8s.io/api/core/v1.FCVolumeSource", "k8s.io/api/core/v1.FlexVolumeSource", "k8s.io/api/core/v1.FlockerVolumeSource", "k8s.io/api/core/v1.GCEPersistentDiskVolumeSource", "k8s.io/api/core/v1.GitRepoVolumeSource", "k8s.io/api/core/v1.GlusterfsVolumeSource", "k8s.io/api/core/v1.HostPathVolumeSource", "k8s.io/api/core/v1.ISCSIVolumeSource", "k8s.io/api/core/v1.NFSVolumeSource", "k8s.io/api/core/v1.PersistentVolumeClaimVolumeSource", "k8s.io/api/core/v1.PhotonPersistentDiskVolumeSource", "k8s.io/api/core/v1.PortworxVolumeSource", "k8s.io/api/core/v1.ProjectedVolumeSource", "k8s.io/api/core/v1.QuobyteVolumeSource", "k8s.io/api/core/v1.RBDVolumeSource", "k8s.io/api/core/v1.ScaleIOVolumeSource", "k8s.io/api/core/v1.SecretVolumeSource", "k8s.io/api/core/v1.StorageOSVolumeSource", "k8s.io/api/core/v1.VsphereVirtualDiskVolumeSource"}, } } + +func schema_fluid_cloudnative_fluid_api_v1alpha1_WaitingStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "operationComplete": { + SchemaProps: spec.SchemaProps{ + Description: "OperationComplete indicates if the preceding operation is complete", + Type: []string{"boolean"}, + Format: "", + }, + }, + }, + }, + }, + } +} diff --git a/api/v1alpha1/status.go b/api/v1alpha1/status.go index dce93dcd329..5113b6d4ef8 100644 --- a/api/v1alpha1/status.go +++ b/api/v1alpha1/status.go @@ -146,6 +146,8 @@ type OperationStatus struct { LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"` // LastSuccessfulTime is the last time the cron operation successfully completed LastSuccessfulTime *metav1.Time `json:"lastSuccessfulTime,omitempty"` + // WaitingStatus stores information about waiting operation. + WaitingFor WaitingStatus `json:"waitingFor,omitempty"` } type RuntimePhase string diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 86a20b6a9ec..267a8eedd37 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -407,6 +407,11 @@ func (in *DataBackupSpec) DeepCopyInto(out *DataBackupSpec) { *out = new(User) (*in).DeepCopyInto(*out) } + if in.RunAfter != nil { + in, out := &in.RunAfter, &out.RunAfter + *out = new(OperationRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataBackupSpec. @@ -514,6 +519,11 @@ func (in *DataLoadSpec) DeepCopyInto(out *DataLoadSpec) { (*out)[key] = val } } + if in.RunAfter != nil { + in, out := &in.RunAfter, &out.RunAfter + *out = new(OperationRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataLoadSpec. @@ -618,6 +628,11 @@ func (in *DataMigrateSpec) DeepCopyInto(out *DataMigrateSpec) { (*out)[key] = val } } + if in.RunAfter != nil { + in, out := &in.RunAfter, &out.RunAfter + *out = new(OperationRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataMigrateSpec. @@ -694,6 +709,11 @@ func (in *DataProcessSpec) DeepCopyInto(out *DataProcessSpec) { *out = *in out.Dataset = in.Dataset in.Processor.DeepCopyInto(&out.Processor) + if in.RunAfter != nil { + in, out := &in.RunAfter, &out.RunAfter + *out = new(OperationRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataProcessSpec. @@ -1981,6 +2001,21 @@ func (in *OSAdvise) DeepCopy() *OSAdvise { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OperationRef) DeepCopyInto(out *OperationRef) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OperationRef. +func (in *OperationRef) DeepCopy() *OperationRef { + if in == nil { + return nil + } + out := new(OperationRef) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OperationStatus) DeepCopyInto(out *OperationStatus) { *out = *in @@ -2006,6 +2041,7 @@ func (in *OperationStatus) DeepCopyInto(out *OperationStatus) { in, out := &in.LastSuccessfulTime, &out.LastSuccessfulTime *out = (*in).DeepCopy() } + in.WaitingFor.DeepCopyInto(&out.WaitingFor) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OperationStatus. @@ -2659,3 +2695,23 @@ func (in *VolumeSource) DeepCopy() *VolumeSource { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WaitingStatus) DeepCopyInto(out *WaitingStatus) { + *out = *in + if in.OperationComplete != nil { + in, out := &in.OperationComplete, &out.OperationComplete + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WaitingStatus. +func (in *WaitingStatus) DeepCopy() *WaitingStatus { + if in == nil { + return nil + } + out := new(WaitingStatus) + in.DeepCopyInto(out) + return out +} diff --git a/charts/fluid/fluid/crds/data.fluid.io_databackups.yaml b/charts/fluid/fluid/crds/data.fluid.io_databackups.yaml index f5823736ac6..f4413c370a1 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_databackups.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_databackups.yaml @@ -65,6 +65,29 @@ spec: dataset: description: Dataset defines the target dataset of the DataBackup type: string + runAfter: + description: Specifies that the preceding operation in a workflow + properties: + name: + description: Name specifies the name of the referred data operation + type: string + namespace: + description: Namespace specifies the namespace of the referred + data operation + type: string + operationKind: + description: OperationKind specifies the type of the data operation + enum: + - DataLoad + - DataBackup + - DataMigrate + - DataProcess + type: string + required: + - name + - namespace + - operationKind + type: object runAs: description: Manage the user to run Alluxio DataBackup properties: @@ -148,6 +171,14 @@ spec: phase: description: Phase describes current phase of operation type: string + waitingFor: + description: WaitingStatus stores information about waiting operation. + properties: + operationComplete: + description: OperationComplete indicates if the preceding operation + is complete + type: boolean + type: object required: - conditions - duration diff --git a/charts/fluid/fluid/crds/data.fluid.io_dataloads.yaml b/charts/fluid/fluid/crds/data.fluid.io_dataloads.yaml index ba6c5f59b90..934d3dd9178 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_dataloads.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_dataloads.yaml @@ -926,6 +926,29 @@ spec: - Cron - OnEvent type: string + runAfter: + description: Specifies that the preceding operation in a workflow + properties: + name: + description: Name specifies the name of the referred data operation + type: string + namespace: + description: Namespace specifies the namespace of the referred + data operation + type: string + operationKind: + description: OperationKind specifies the type of the data operation + enum: + - DataLoad + - DataBackup + - DataMigrate + - DataProcess + type: string + required: + - name + - namespace + - operationKind + type: object schedule: description: The schedule in Cron format, only set when policy is cron, see https://en.wikipedia.org/wiki/Cron. @@ -1050,6 +1073,14 @@ spec: phase: description: Phase describes current phase of operation type: string + waitingFor: + description: WaitingStatus stores information about waiting operation. + properties: + operationComplete: + description: OperationComplete indicates if the preceding operation + is complete + type: boolean + type: object required: - conditions - duration diff --git a/charts/fluid/fluid/crds/data.fluid.io_datamigrates.yaml b/charts/fluid/fluid/crds/data.fluid.io_datamigrates.yaml index 0cc35df0a6e..dff1c8fc3df 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_datamigrates.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_datamigrates.yaml @@ -974,6 +974,29 @@ spec: - Cron - OnEvent type: string + runAfter: + description: Specifies that the preceding operation in a workflow + properties: + name: + description: Name specifies the name of the referred data operation + type: string + namespace: + description: Namespace specifies the namespace of the referred + data operation + type: string + operationKind: + description: OperationKind specifies the type of the data operation + enum: + - DataLoad + - DataBackup + - DataMigrate + - DataProcess + type: string + required: + - name + - namespace + - operationKind + type: object runtimeType: description: using which runtime to migrate data; if none, take dataset runtime as default @@ -1143,6 +1166,14 @@ spec: phase: description: Phase describes current phase of operation type: string + waitingFor: + description: WaitingStatus stores information about waiting operation. + properties: + operationComplete: + description: OperationComplete indicates if the preceding operation + is complete + type: boolean + type: object required: - conditions - duration diff --git a/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml b/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml index 1a1c00ec6c3..9f9acc8a194 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml @@ -9261,6 +9261,29 @@ spec: of the container type: string type: object + runAfter: + description: Specifies that the preceding operation in a workflow + properties: + name: + description: Name specifies the name of the referred data operation + type: string + namespace: + description: Namespace specifies the namespace of the referred + data operation + type: string + operationKind: + description: OperationKind specifies the type of the data operation + enum: + - DataLoad + - DataBackup + - DataMigrate + - DataProcess + type: string + required: + - name + - namespace + - operationKind + type: object required: - dataset - processor @@ -9324,6 +9347,14 @@ spec: phase: description: Phase describes current phase of operation type: string + waitingFor: + description: WaitingStatus stores information about waiting operation. + properties: + operationComplete: + description: OperationComplete indicates if the preceding operation + is complete + type: boolean + type: object required: - conditions - duration diff --git a/cmd/dataset/app/dataset.go b/cmd/dataset/app/dataset.go index 3ee59f02b60..ef1972b6f26 100644 --- a/cmd/dataset/app/dataset.go +++ b/cmd/dataset/app/dataset.go @@ -39,6 +39,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/controllers" databackupctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/databackup" + dataflowctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/dataflow" dataloadctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/dataload" datamigratectl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/datamigrate" dataprocessctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/dataprocess" @@ -167,6 +168,15 @@ func handle() { os.Exit(1) } + if err = (dataflowctl.NewDataFlowReconciler(mgr.GetClient(), + ctrl.Log.WithName("dataflowctl"), + mgr.GetEventRecorderFor("DataFlow"), + time.Duration(5*time.Second), + )).SetupWithManager(mgr, controllerOptions); err != nil { + setupLog.Error(err, "unable to create controller") + os.Exit(1) + } + setupLog.Info("starting dataset-controller") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running dataset-controller") diff --git a/config/crd/bases/data.fluid.io_databackups.yaml b/config/crd/bases/data.fluid.io_databackups.yaml index f5823736ac6..f4413c370a1 100644 --- a/config/crd/bases/data.fluid.io_databackups.yaml +++ b/config/crd/bases/data.fluid.io_databackups.yaml @@ -65,6 +65,29 @@ spec: dataset: description: Dataset defines the target dataset of the DataBackup type: string + runAfter: + description: Specifies that the preceding operation in a workflow + properties: + name: + description: Name specifies the name of the referred data operation + type: string + namespace: + description: Namespace specifies the namespace of the referred + data operation + type: string + operationKind: + description: OperationKind specifies the type of the data operation + enum: + - DataLoad + - DataBackup + - DataMigrate + - DataProcess + type: string + required: + - name + - namespace + - operationKind + type: object runAs: description: Manage the user to run Alluxio DataBackup properties: @@ -148,6 +171,14 @@ spec: phase: description: Phase describes current phase of operation type: string + waitingFor: + description: WaitingStatus stores information about waiting operation. + properties: + operationComplete: + description: OperationComplete indicates if the preceding operation + is complete + type: boolean + type: object required: - conditions - duration diff --git a/config/crd/bases/data.fluid.io_dataloads.yaml b/config/crd/bases/data.fluid.io_dataloads.yaml index ba6c5f59b90..934d3dd9178 100644 --- a/config/crd/bases/data.fluid.io_dataloads.yaml +++ b/config/crd/bases/data.fluid.io_dataloads.yaml @@ -926,6 +926,29 @@ spec: - Cron - OnEvent type: string + runAfter: + description: Specifies that the preceding operation in a workflow + properties: + name: + description: Name specifies the name of the referred data operation + type: string + namespace: + description: Namespace specifies the namespace of the referred + data operation + type: string + operationKind: + description: OperationKind specifies the type of the data operation + enum: + - DataLoad + - DataBackup + - DataMigrate + - DataProcess + type: string + required: + - name + - namespace + - operationKind + type: object schedule: description: The schedule in Cron format, only set when policy is cron, see https://en.wikipedia.org/wiki/Cron. @@ -1050,6 +1073,14 @@ spec: phase: description: Phase describes current phase of operation type: string + waitingFor: + description: WaitingStatus stores information about waiting operation. + properties: + operationComplete: + description: OperationComplete indicates if the preceding operation + is complete + type: boolean + type: object required: - conditions - duration diff --git a/config/crd/bases/data.fluid.io_datamigrates.yaml b/config/crd/bases/data.fluid.io_datamigrates.yaml index 0cc35df0a6e..dff1c8fc3df 100644 --- a/config/crd/bases/data.fluid.io_datamigrates.yaml +++ b/config/crd/bases/data.fluid.io_datamigrates.yaml @@ -974,6 +974,29 @@ spec: - Cron - OnEvent type: string + runAfter: + description: Specifies that the preceding operation in a workflow + properties: + name: + description: Name specifies the name of the referred data operation + type: string + namespace: + description: Namespace specifies the namespace of the referred + data operation + type: string + operationKind: + description: OperationKind specifies the type of the data operation + enum: + - DataLoad + - DataBackup + - DataMigrate + - DataProcess + type: string + required: + - name + - namespace + - operationKind + type: object runtimeType: description: using which runtime to migrate data; if none, take dataset runtime as default @@ -1143,6 +1166,14 @@ spec: phase: description: Phase describes current phase of operation type: string + waitingFor: + description: WaitingStatus stores information about waiting operation. + properties: + operationComplete: + description: OperationComplete indicates if the preceding operation + is complete + type: boolean + type: object required: - conditions - duration diff --git a/config/crd/bases/data.fluid.io_dataprocesses.yaml b/config/crd/bases/data.fluid.io_dataprocesses.yaml index 1a1c00ec6c3..9f9acc8a194 100644 --- a/config/crd/bases/data.fluid.io_dataprocesses.yaml +++ b/config/crd/bases/data.fluid.io_dataprocesses.yaml @@ -9261,6 +9261,29 @@ spec: of the container type: string type: object + runAfter: + description: Specifies that the preceding operation in a workflow + properties: + name: + description: Name specifies the name of the referred data operation + type: string + namespace: + description: Namespace specifies the namespace of the referred + data operation + type: string + operationKind: + description: OperationKind specifies the type of the data operation + enum: + - DataLoad + - DataBackup + - DataMigrate + - DataProcess + type: string + required: + - name + - namespace + - operationKind + type: object required: - dataset - processor @@ -9324,6 +9347,14 @@ spec: phase: description: Phase describes current phase of operation type: string + waitingFor: + description: WaitingStatus stores information about waiting operation. + properties: + operationComplete: + description: OperationComplete indicates if the preceding operation + is complete + type: boolean + type: object required: - conditions - duration diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 4d9a4dafc3e..5c470a458b3 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -59,6 +59,13 @@ const ( DataOperationCollision = "DataOperationCollision" ) +// Events related to dataflow +const ( + DataOperationNotFound = "DataOperationNotFound" + + DataOperationWaiting = "DataOperationWaiting" +) + // Events related to DataLoad const ( DataLoadCollision = "DataLoadCollision" diff --git a/pkg/controllers/v1alpha1/databackup/implement.go b/pkg/controllers/v1alpha1/databackup/implement.go index eb79a2ac20e..6034bd84c66 100644 --- a/pkg/controllers/v1alpha1/databackup/implement.go +++ b/pkg/controllers/v1alpha1/databackup/implement.go @@ -31,7 +31,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - "github.com/fluid-cloudnative/fluid/api/v1alpha1" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" cdatabackup "github.com/fluid-cloudnative/fluid/pkg/databackup" "github.com/fluid-cloudnative/fluid/pkg/utils" @@ -42,7 +42,7 @@ func (r *DataBackupReconciler) GetChartsDirectory() string { } func (r *DataBackupReconciler) UpdateStatusInfoForCompleted(object client.Object, infos map[string]string) error { - dataBackup, ok := object.(*v1alpha1.DataBackup) + dataBackup, ok := object.(*datav1alpha1.DataBackup) if !ok { return fmt.Errorf("object %v is not a DataBackup", object) } @@ -63,16 +63,16 @@ func (r *DataBackupReconciler) UpdateStatusInfoForCompleted(object client.Object return nil } -func (r *DataBackupReconciler) Validate(ctx runtime.ReconcileRequestContext, object client.Object) ([]v1alpha1.Condition, error) { - dataBackup, ok := object.(*v1alpha1.DataBackup) +func (r *DataBackupReconciler) Validate(ctx runtime.ReconcileRequestContext, object client.Object) ([]datav1alpha1.Condition, error) { + dataBackup, ok := object.(*datav1alpha1.DataBackup) if !ok { - return []v1alpha1.Condition{}, fmt.Errorf("object %v is not a DataBackup", object) + return []datav1alpha1.Condition{}, fmt.Errorf("object %v is not a DataBackup", object) } // 0. check the supported backup path format if !strings.HasPrefix(dataBackup.Spec.BackupPath, common.PathScheme.String()) && !strings.HasPrefix(dataBackup.Spec.BackupPath, common.VolumeScheme.String()) { err := fmt.Errorf("don't support path in this form, path: %s", dataBackup.Spec.BackupPath) - return []v1alpha1.Condition{ + return []datav1alpha1.Condition{ { Type: common.Failed, Status: v1.ConditionTrue, @@ -86,18 +86,18 @@ func (r *DataBackupReconciler) Validate(ctx runtime.ReconcileRequestContext, obj return nil, nil } -func (r *DataBackupReconciler) SetTargetDatasetStatusInProgress(dataset *v1alpha1.Dataset) { +func (r *DataBackupReconciler) SetTargetDatasetStatusInProgress(dataset *datav1alpha1.Dataset) { } -func (r *DataBackupReconciler) RemoveTargetDatasetStatusInProgress(dataset *v1alpha1.Dataset) { +func (r *DataBackupReconciler) RemoveTargetDatasetStatusInProgress(dataset *datav1alpha1.Dataset) { } -func (r *DataBackupReconciler) GetOperationType() dataoperation.OperationType { - return dataoperation.DataBackup +func (r *DataBackupReconciler) GetOperationType() datav1alpha1.OperationType { + return datav1alpha1.DataBackupType } -func (r *DataBackupReconciler) GetTargetDataset(object client.Object) (*v1alpha1.Dataset, error) { - typeObject, ok := object.(*v1alpha1.DataBackup) +func (r *DataBackupReconciler) GetTargetDataset(object client.Object) (*datav1alpha1.Dataset, error) { + typeObject, ok := object.(*datav1alpha1.DataBackup) if !ok { return nil, fmt.Errorf("object %v is not a DataBackup", object) } @@ -115,8 +115,8 @@ func (r *DataBackupReconciler) GetReleaseNameSpacedName(object client.Object) ty } // UpdateOperationStatus update the DataBackup Status -func (r *DataBackupReconciler) UpdateOperationApiStatus(object client.Object, opStatus *v1alpha1.OperationStatus) error { - dataBackup, ok := object.(*v1alpha1.DataBackup) +func (r *DataBackupReconciler) UpdateOperationApiStatus(object client.Object, opStatus *datav1alpha1.OperationStatus) error { + dataBackup, ok := object.(*datav1alpha1.DataBackup) if !ok { return fmt.Errorf("%+v is not a type of DataBackup", object) } diff --git a/pkg/controllers/v1alpha1/dataflow/dataflow_controller.go b/pkg/controllers/v1alpha1/dataflow/dataflow_controller.go new file mode 100644 index 00000000000..a168719bb86 --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/dataflow_controller.go @@ -0,0 +1,140 @@ +/* +Copyright 2023 The Fluid Author. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "context" + "fmt" + "time" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const controllerName string = "DataFlowReconciler" + +type DataFlowReconciler struct { + client.Client + Recorder record.EventRecorder + Log logr.Logger + ResyncPeriod time.Duration +} + +func NewDataFlowReconciler(client client.Client, + log logr.Logger, + recorder record.EventRecorder, + resyncPeriod time.Duration) *DataFlowReconciler { + return &DataFlowReconciler{ + Client: client, + Recorder: recorder, + Log: log, + ResyncPeriod: resyncPeriod, + } +} + +type reconcileRequestContext struct { + context.Context + types.NamespacedName + Client client.Client + Log logr.Logger + Recorder record.EventRecorder +} + +var reconcileFuncs = []func(reconcileRequestContext) (bool, error){ + reconcileDataLoad, + reconcileDataMigrate, + reconcileDataProcess, + reconcileDataBackup, +} + +func (r *DataFlowReconciler) Reconcile(context context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.WithValues("dataflow request", req.NamespacedName) + log.Info("reconcile starts") + defer log.Info("reconcile ends") + + ctx := reconcileRequestContext{ + Context: context, + NamespacedName: req.NamespacedName, + Client: r.Client, + Log: r.Log, + Recorder: r.Recorder, + } + + needRequeue := false + for _, reconcileFn := range reconcileFuncs { + opNeedRequeue, err := reconcileFn(ctx) + if err != nil { + return utils.RequeueIfError(err) + } + + // requeue if any of the operation needs requeue + needRequeue = needRequeue || opNeedRequeue + } + + if needRequeue { + return utils.RequeueAfterInterval(r.ResyncPeriod) + } + return utils.NoRequeue() +} + +func (r *DataFlowReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { + handler := &handler.EnqueueRequestForObject{} + + predicates := builder.WithPredicates(predicate.NewPredicateFuncs(func(obj client.Object) bool { + if !obj.GetDeletionTimestamp().IsZero() { + // No need to trigger reconcilations for deleted objects + return false + } + + objNamespacedName := types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()} + opStatus, err := utils.GetOperationStatus(obj) + if err != nil { + r.Log.Info(fmt.Sprintf("skip enqueue object: %v", err.Error()), "object", objNamespacedName) + return false + } + + // DataFlowReconciler only reconciles data operations that are waiting for other data operations + if opStatus.WaitingFor.OperationComplete != nil && *opStatus.WaitingFor.OperationComplete { + return true + } + + r.Log.V(1).Info("skip enqueue object: operatin not waiting", "object", objNamespacedName) + return false + })) + + return ctrl.NewControllerManagedBy(mgr). + WithOptions(options). + // controller is forced to For a kind by controller-runtime + For(&datav1alpha1.DataBackup{}, predicates). + Watches(&source.Kind{Type: &datav1alpha1.DataLoad{}}, handler, predicates). + // Watches(&source.Kind{Type: &datav1alpha1.DataBackup{}}, handler, predicates). + Watches(&source.Kind{Type: &datav1alpha1.DataMigrate{}}, handler, predicates). + Watches(&source.Kind{Type: &datav1alpha1.DataProcess{}}, handler, predicates). + Complete(r) +} + +func (r *DataFlowReconciler) ControllerName() string { + return controllerName +} diff --git a/pkg/controllers/v1alpha1/dataflow/operations.go b/pkg/controllers/v1alpha1/dataflow/operations.go new file mode 100644 index 00000000000..4a119af7299 --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/operations.go @@ -0,0 +1,175 @@ +package dataflow + +import ( + "context" + "reflect" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" + utilpointer "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func reconcileDataLoad(ctx reconcileRequestContext) (needRequeue bool, err error) { + dataLoad, err := utils.GetDataLoad(ctx.Client, ctx.Name, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.V(1).Info("DataLoad not found, skip reconciling") + return false, nil + } + return true, errors.Wrap(err, "failed to get dataload") + } + + updateStatusFn := func() error { + tmp, err := utils.GetDataLoad(ctx.Client, ctx.Name, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + return nil + } + return err + } + + toUpdate := tmp.DeepCopy() + toUpdate.Status.WaitingFor.OperationComplete = utilpointer.Bool(false) + if !reflect.DeepEqual(toUpdate.Status, tmp.Status) { + return ctx.Client.Status().Update(context.TODO(), toUpdate) + } + + return nil + } + + return reconcileOperationDataFlow(ctx, dataLoad, dataLoad.Spec.RunAfter, dataLoad.Status, updateStatusFn) +} + +func reconcileDataBackup(ctx reconcileRequestContext) (needRequeue bool, err error) { + dataBackup, err := utils.GetDataBackup(ctx.Client, ctx.Name, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.V(1).Info("DataBackup not found, skip reconciling") + return false, nil + } + return true, errors.Wrap(err, "failed to get databackup") + } + + updateStatusFn := func() error { + tmp, err := utils.GetDataBackup(ctx.Client, ctx.Name, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + return nil + } + return err + } + + toUpdate := tmp.DeepCopy() + toUpdate.Status.WaitingFor.OperationComplete = utilpointer.Bool(false) + if !reflect.DeepEqual(toUpdate.Status, tmp.Status) { + return ctx.Client.Status().Update(context.TODO(), toUpdate) + } + + return nil + } + + return reconcileOperationDataFlow(ctx, dataBackup, dataBackup.Spec.RunAfter, dataBackup.Status, updateStatusFn) +} + +func reconcileDataMigrate(ctx reconcileRequestContext) (needRequeue bool, err error) { + dataMigrate, err := utils.GetDataMigrate(ctx.Client, ctx.Name, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.V(1).Info("DataMigrate not found, skip reconciling") + return false, nil + } + return true, errors.Wrap(err, "failed to get datamigrate") + } + + updateStatusFn := func() error { + tmp, err := utils.GetDataMigrate(ctx.Client, ctx.Name, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + return nil + } + return err + } + + toUpdate := tmp.DeepCopy() + toUpdate.Status.WaitingFor.OperationComplete = utilpointer.Bool(false) + if !reflect.DeepEqual(toUpdate.Status, tmp.Status) { + return ctx.Client.Status().Update(context.TODO(), toUpdate) + } + + return nil + } + + return reconcileOperationDataFlow(ctx, dataMigrate, dataMigrate.Spec.RunAfter, dataMigrate.Status, updateStatusFn) +} + +func reconcileDataProcess(ctx reconcileRequestContext) (needRequeue bool, err error) { + dataProcess, err := utils.GetDataProcess(ctx.Client, ctx.Name, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.V(1).Info("DataMigrate not found, skip reconciling") + return false, nil + } + return true, errors.Wrap(err, "failed to get datamigrate") + } + + updateStatusFn := func() error { + tmp, err := utils.GetDataProcess(ctx.Client, ctx.Name, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + return nil + } + return err + } + + toUpdate := tmp.DeepCopy() + toUpdate.Status.WaitingFor.OperationComplete = utilpointer.Bool(false) + if !reflect.DeepEqual(toUpdate.Status, tmp.Status) { + return ctx.Client.Status().Update(context.TODO(), toUpdate) + } + + return nil + } + + return reconcileOperationDataFlow(ctx, dataProcess, dataProcess.Spec.RunAfter, dataProcess.Status, updateStatusFn) +} + +func reconcileOperationDataFlow(ctx reconcileRequestContext, + object client.Object, + runAfter *datav1alpha1.OperationRef, + opStatus datav1alpha1.OperationStatus, + updateStatusFn func() error) (needRequeue bool, err error) { + precedingOpStatus, err := utils.GetPrecedingOperationStatus(ctx.Client, runAfter) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + // preceding operation not found + ctx.Recorder.Eventf(object, corev1.EventTypeWarning, common.DataOperationNotFound, "Preceding operation %s \"%s/%s\" not found", + runAfter.OperationKind, + runAfter.Namespace, + runAfter.Name) + return true, nil + } + return true, errors.Wrapf(err, "failed to get preceding operation status (DataLoad.Spec.RunAfter: %v)", runAfter) + } + + if precedingOpStatus != nil && precedingOpStatus.Phase != common.PhaseComplete { + ctx.Recorder.Eventf(object, corev1.EventTypeNormal, common.DataOperationWaiting, "Waiting for operation %s \"%s/%s\" to complete", + runAfter.OperationKind, + runAfter.Namespace, + runAfter.Name) + return true, nil + } + + // set opStatus.waitingFor.operationComplete back to false + err = retry.RetryOnConflict(retry.DefaultBackoff, updateStatusFn) + + if err != nil { + return true, errors.Wrapf(err, "failed to update operation status waitingFor.OperationComplete=false") + } + + return false, nil +} diff --git a/pkg/controllers/v1alpha1/dataload/implement.go b/pkg/controllers/v1alpha1/dataload/implement.go index 0d9d455c984..fab41c9c5e0 100644 --- a/pkg/controllers/v1alpha1/dataload/implement.go +++ b/pkg/controllers/v1alpha1/dataload/implement.go @@ -58,8 +58,8 @@ func (r *DataLoadReconciler) GetChartsDirectory() string { return utils.GetChartsDirectory() + "/" + cdataload.DataloadChart } -func (r *DataLoadReconciler) GetOperationType() dataoperation.OperationType { - return dataoperation.DataLoad +func (r *DataLoadReconciler) GetOperationType() datav1alpha1.OperationType { + return datav1alpha1.DataLoadType } func (r *DataLoadReconciler) UpdateOperationApiStatus(object client.Object, opStatus *datav1alpha1.OperationStatus) error { diff --git a/pkg/controllers/v1alpha1/datamigrate/implement.go b/pkg/controllers/v1alpha1/datamigrate/implement.go index 6cec9709cf1..ff29bb169ac 100644 --- a/pkg/controllers/v1alpha1/datamigrate/implement.go +++ b/pkg/controllers/v1alpha1/datamigrate/implement.go @@ -52,8 +52,8 @@ func (r *DataMigrateReconciler) GetChartsDirectory() string { return utils.GetChartsDirectory() + "/" + cdatamigrate.DataMigrateChart } -func (r *DataMigrateReconciler) GetOperationType() dataoperation.OperationType { - return dataoperation.DataMigrate +func (r *DataMigrateReconciler) GetOperationType() datav1alpha1.OperationType { + return datav1alpha1.DataMigrateType } func (r *DataMigrateReconciler) UpdateOperationApiStatus(object client.Object, opStatus *datav1alpha1.OperationStatus) error { diff --git a/pkg/controllers/v1alpha1/dataprocess/implement.go b/pkg/controllers/v1alpha1/dataprocess/implement.go index 01b8f60c8aa..7ff273f5b36 100644 --- a/pkg/controllers/v1alpha1/dataprocess/implement.go +++ b/pkg/controllers/v1alpha1/dataprocess/implement.go @@ -56,8 +56,8 @@ func (r *DataProcessReconciler) GetChartsDirectory() string { } // GetOperationType get the data operation type -func (r *DataProcessReconciler) GetOperationType() dataoperation.OperationType { - return dataoperation.DataProcess +func (r *DataProcessReconciler) GetOperationType() datav1alpha1.OperationType { + return datav1alpha1.DataProcessType } // UpdateOperationApiStatus update the data operation status, object is the data operation crd instance. diff --git a/pkg/dataoperation/context.go b/pkg/dataoperation/context.go index b43c2c712c2..f3fa695407a 100644 --- a/pkg/dataoperation/context.go +++ b/pkg/dataoperation/context.go @@ -22,14 +22,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -type OperationType string - -const ( - DataLoad OperationType = "DataLoad" - DataBackup OperationType = "DataBackup" - DataMigrate OperationType = "DataMigrate" - DataProcess OperationType = "DataProcess" -) +// type OperationType string + +// const ( +// DataLoad OperationType = "DataLoad" +// DataBackup OperationType = "DataBackup" +// DataMigrate OperationType = "DataMigrate" +// DataProcess OperationType = "DataProcess" +// ) // ReconcileRequestContext loads or applys the configuration state of a service. type ReconcileRequestContext struct { diff --git a/pkg/dataoperation/interface.go b/pkg/dataoperation/interface.go index eaf79c60903..3a5f2977cc7 100644 --- a/pkg/dataoperation/interface.go +++ b/pkg/dataoperation/interface.go @@ -36,7 +36,7 @@ type OperationInterface interface { GetChartsDirectory() string // GetOperationType get the data operation type - GetOperationType() OperationType + GetOperationType() datav1alpha1.OperationType // UpdateOperationApiStatus update the data operation status, object is the data operation crd instance. UpdateOperationApiStatus(object client.Object, opStatus *datav1alpha1.OperationStatus) error diff --git a/pkg/ddc/alluxio/operate.go b/pkg/ddc/alluxio/operate.go index 8d7be772c21..7ca14540f49 100644 --- a/pkg/ddc/alluxio/operate.go +++ b/pkg/ddc/alluxio/operate.go @@ -17,6 +17,7 @@ limitations under the License. package alluxio import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/errors" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" @@ -28,13 +29,13 @@ func (e *AlluxioEngine) GetDataOperationValueFile(ctx cruntime.ReconcileRequestC operationType := operation.GetOperationType() switch operationType { - case dataoperation.DataBackup: + case datav1alpha1.DataBackupType: valueFileName, err = e.generateDataBackupValueFile(ctx, object) return valueFileName, err - case dataoperation.DataLoad: + case datav1alpha1.DataLoadType: valueFileName, err = e.generateDataLoadValueFile(ctx, object) return valueFileName, err - case dataoperation.DataProcess: + case datav1alpha1.DataProcessType: valueFileName, err = e.generateDataProcessValueFile(ctx, object) return valueFileName, err default: diff --git a/pkg/ddc/base/operation.go b/pkg/ddc/base/operation.go index 4cb4a5aa101..3e785a9bead 100644 --- a/pkg/ddc/base/operation.go +++ b/pkg/ddc/base/operation.go @@ -20,6 +20,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + utilpointer "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -37,7 +38,7 @@ func (t *TemplateEngine) Operate(ctx cruntime.ReconcileRequestContext, object cl // runtime engine override the template engine switch operateType { - case dataoperation.DataBackup: + case datav1alpha1.DataBackupType: ownImpl, ok := t.Implement.(Databackuper) if ok { targetDataBackup, success := object.(*datav1alpha1.DataBackup) @@ -92,6 +93,14 @@ func (t *TemplateEngine) reconcileNone(ctx cruntime.ReconcileRequestContext, obj } opStatus.Duration = "Unfinished" opStatus.Infos = map[string]string{} + + if exists, err := utils.HasPrecedingOperation(object); err != nil { + log.Error(err, "failed to check if object has specifies spec.runAfter") + return utils.RequeueIfError(err) + } else if exists { + opStatus.WaitingFor.OperationComplete = utilpointer.Bool(true) + } + if err = operation.UpdateOperationApiStatus(object, opStatus); err != nil { log.Error(err, fmt.Sprintf("failed to update the %s", operation.GetOperationType())) return utils.RequeueIfError(err) @@ -105,7 +114,13 @@ func (t *TemplateEngine) reconcilePending(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus, operation dataoperation.OperationInterface) (ctrl.Result, error) { log := ctx.Log.WithName("reconcilePending") - // 1. set current data operation to dataset + // 1. check preceding operation status + if opStatus.WaitingFor.OperationComplete != nil && *opStatus.WaitingFor.OperationComplete { + // when operationComplete set back to false, a new reconcilation loop will be triggered, so no requeue here. + return utils.NoRequeue() + } + + // 2. set current data operation to dataset err := SetDataOperationInTargetDataset(ctx, object, operation, t) if err != nil { return utils.RequeueAfterInterval(20 * time.Second) diff --git a/pkg/ddc/base/operation_helm.go b/pkg/ddc/base/operation_helm.go index 3ac9b1dd18f..9e39929c9f5 100644 --- a/pkg/ddc/base/operation_helm.go +++ b/pkg/ddc/base/operation_helm.go @@ -18,6 +18,8 @@ package base import ( "fmt" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils/helm" @@ -49,7 +51,7 @@ func InstallDataOperationHelmIfNotExist(ctx cruntime.ReconcileRequestContext, ob } var chartName string - if operation.GetOperationType() == dataoperation.DataProcess { + if operation.GetOperationType() == datav1alpha1.DataProcessType { // for DataProcess, all engine share the same chart chartName = operation.GetChartsDirectory() + "/" + "common" } else { diff --git a/pkg/ddc/efc/operate.go b/pkg/ddc/efc/operate.go index 52cb94674dc..c9130228746 100644 --- a/pkg/ddc/efc/operate.go +++ b/pkg/ddc/efc/operate.go @@ -17,6 +17,7 @@ limitations under the License. package efc import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/errors" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" @@ -28,7 +29,7 @@ func (e *EFCEngine) GetDataOperationValueFile(ctx cruntime.ReconcileRequestConte operationType := operation.GetOperationType() switch operationType { - case dataoperation.DataProcess: + case datav1alpha1.DataProcessType: valueFileName, err = e.generateDataProcessValueFile(ctx, object) return valueFileName, err default: diff --git a/pkg/ddc/goosefs/operate.go b/pkg/ddc/goosefs/operate.go index 8e142361408..28f1bca399c 100644 --- a/pkg/ddc/goosefs/operate.go +++ b/pkg/ddc/goosefs/operate.go @@ -17,6 +17,7 @@ limitations under the License. package goosefs import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/errors" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" @@ -27,12 +28,12 @@ import ( func (e *GooseFSEngine) GetDataOperationValueFile(ctx cruntime.ReconcileRequestContext, object client.Object, operation dataoperation.OperationInterface) (valueFileName string, err error) { operateType := operation.GetOperationType() - if operateType == dataoperation.DataBackup { + if operateType == datav1alpha1.DataBackupType { valueFileName, err = e.generateDataBackupValueFile(ctx, object) return valueFileName, err } - if operateType == dataoperation.DataLoad { + if operateType == datav1alpha1.DataLoadType { valueFileName, err = e.generateDataLoadValueFile(ctx, object) return valueFileName, err } diff --git a/pkg/ddc/jindo/operate.go b/pkg/ddc/jindo/operate.go index c31daf6a4ff..93737e725ff 100644 --- a/pkg/ddc/jindo/operate.go +++ b/pkg/ddc/jindo/operate.go @@ -17,6 +17,7 @@ limitations under the License. package jindo import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/errors" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" @@ -27,7 +28,7 @@ import ( func (e *JindoEngine) GetDataOperationValueFile(ctx cruntime.ReconcileRequestContext, object client.Object, operation dataoperation.OperationInterface) (valueFileName string, err error) { operationType := operation.GetOperationType() - if operationType == dataoperation.DataLoad { + if operationType == datav1alpha1.DataLoadType { valueFileName, err = e.generateDataLoadValueFile(ctx, object) return valueFileName, err } diff --git a/pkg/ddc/jindocache/operate.go b/pkg/ddc/jindocache/operate.go index 6d760820266..eb9832745aa 100644 --- a/pkg/ddc/jindocache/operate.go +++ b/pkg/ddc/jindocache/operate.go @@ -17,6 +17,7 @@ limitations under the License. package jindocache import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/errors" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" @@ -27,7 +28,7 @@ import ( func (e *JindoCacheEngine) GetDataOperationValueFile(ctx cruntime.ReconcileRequestContext, object client.Object, operation dataoperation.OperationInterface) (valueFileName string, err error) { operationType := operation.GetOperationType() - if operationType == dataoperation.DataLoad { + if operationType == datav1alpha1.DataLoadType { valueFileName, err = e.generateDataLoadValueFile(ctx, object) return valueFileName, err } diff --git a/pkg/ddc/jindofsx/operate.go b/pkg/ddc/jindofsx/operate.go index a8d506cf6e5..3301cba7f89 100644 --- a/pkg/ddc/jindofsx/operate.go +++ b/pkg/ddc/jindofsx/operate.go @@ -17,6 +17,7 @@ limitations under the License. package jindofsx import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/errors" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" @@ -28,10 +29,10 @@ func (e *JindoFSxEngine) GetDataOperationValueFile(ctx cruntime.ReconcileRequest operationType := operation.GetOperationType() switch operationType { - case dataoperation.DataLoad: + case datav1alpha1.DataLoadType: valueFileName, err = e.generateDataLoadValueFile(ctx, object) return valueFileName, err - case dataoperation.DataProcess: + case datav1alpha1.DataProcessType: valueFileName, err = e.generateDataProcessValueFile(ctx, object) return valueFileName, err default: diff --git a/pkg/ddc/juicefs/operate.go b/pkg/ddc/juicefs/operate.go index a586b4c8223..80e11638f39 100644 --- a/pkg/ddc/juicefs/operate.go +++ b/pkg/ddc/juicefs/operate.go @@ -17,6 +17,7 @@ limitations under the License. package juicefs import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/errors" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" @@ -29,13 +30,13 @@ func (j *JuiceFSEngine) GetDataOperationValueFile(ctx cruntime.ReconcileRequestC operationType := operation.GetOperationType() switch operationType { - case dataoperation.DataMigrate: + case datav1alpha1.DataMigrateType: valueFileName, err = j.generateDataMigrateValueFile(ctx, object) return valueFileName, err - case dataoperation.DataLoad: + case datav1alpha1.DataLoadType: valueFileName, err = j.generateDataLoadValueFile(ctx, object) return valueFileName, err - case dataoperation.DataProcess: + case datav1alpha1.DataProcessType: valueFileName, err = j.generateDataProcessValueFile(ctx, object) return valueFileName, err default: diff --git a/pkg/ddc/thin/operate.go b/pkg/ddc/thin/operate.go index bb4c3e8dfb9..ed8fb1755b7 100644 --- a/pkg/ddc/thin/operate.go +++ b/pkg/ddc/thin/operate.go @@ -17,6 +17,7 @@ limitations under the License. package thin import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/errors" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" @@ -28,7 +29,7 @@ func (t *ThinEngine) GetDataOperationValueFile(ctx cruntime.ReconcileRequestCont operationType := operation.GetOperationType() switch operationType { - case dataoperation.DataProcess: + case datav1alpha1.DataProcessType: valueFileName, err = t.generateDataProcessValueFile(ctx, object) return valueFileName, err default: diff --git a/pkg/utils/dataoperation.go b/pkg/utils/dataoperation.go index 7326912e970..1f693cac782 100644 --- a/pkg/utils/dataoperation.go +++ b/pkg/utils/dataoperation.go @@ -19,6 +19,8 @@ package utils import ( "context" "fmt" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -40,3 +42,74 @@ func ListDataOperationJobByCronjob(c client.Client, cronjobNamespacedName types. } return jobList.Items, nil } + +func GetOperationStatus(obj client.Object) (*datav1alpha1.OperationStatus, error) { + if obj == nil { + return nil, nil + } + + if dataLoad, ok := obj.(*datav1alpha1.DataLoad); ok { + return dataLoad.Status.DeepCopy(), nil + } else if dataMigrate, ok := obj.(*datav1alpha1.DataMigrate); ok { + return dataMigrate.Status.DeepCopy(), nil + } else if dataBackup, ok := obj.(*datav1alpha1.DataBackup); ok { + return dataBackup.Status.DeepCopy(), nil + } else if dataProcess, ok := obj.(*datav1alpha1.DataProcess); ok { + return dataProcess.Status.DeepCopy(), nil + } + + return nil, fmt.Errorf("obj is not of any data operation type") +} + +func GetPrecedingOperationStatus(client client.Client, opRef *datav1alpha1.OperationRef) (*datav1alpha1.OperationStatus, error) { + if opRef == nil { + return nil, nil + } + + switch opRef.OperationKind { + case datav1alpha1.DataBackupType: + object, err := GetDataBackup(client, opRef.Name, opRef.Namespace) + if err != nil { + return nil, err + } + return &object.Status, nil + case datav1alpha1.DataLoadType: + object, err := GetDataLoad(client, opRef.Name, opRef.Namespace) + if err != nil { + return nil, err + } + return &object.Status, nil + case datav1alpha1.DataMigrateType: + object, err := GetDataMigrate(client, opRef.Name, opRef.Namespace) + if err != nil { + return nil, err + } + return &object.Status, nil + case datav1alpha1.DataProcessType: + object, err := GetDataProcess(client, opRef.Name, opRef.Namespace) + if err != nil { + return nil, err + } + return &object.Status, nil + default: + return nil, fmt.Errorf("unknown data operation kind") + } +} + +func HasPrecedingOperation(obj client.Object) (has bool, err error) { + if obj == nil { + return false, nil + } + + if dataLoad, ok := obj.(*datav1alpha1.DataLoad); ok { + return dataLoad.Spec.RunAfter != nil, nil + } else if dataMigrate, ok := obj.(*datav1alpha1.DataMigrate); ok { + return dataMigrate.Spec.RunAfter != nil, nil + } else if dataBackup, ok := obj.(*datav1alpha1.DataBackup); ok { + return dataBackup.Spec.RunAfter != nil, nil + } else if dataProcess, ok := obj.(*datav1alpha1.DataProcess); ok { + return dataProcess.Spec.RunAfter != nil, nil + } + + return false, fmt.Errorf("obj is not of any data operation type") +}