Skip to content

Commit

Permalink
feat(backend): Remove PipelineSpec Template storage from ObjStore res…
Browse files Browse the repository at this point in the history
…ponsibilies. Fixes kubeflow#10509

Signed-off-by: Giulio Frasca <[email protected]>
  • Loading branch information
gmfrasca committed Jun 24, 2024
1 parent 991a610 commit 20e1074
Showing 1 changed file with 2 additions and 50 deletions.
52 changes: 2 additions & 50 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,6 @@ func (r *ResourceManager) CreatePipelineAndPipelineVersion(p *model.Pipeline, pv
return nil, nil, util.Wrap(err, "Failed to create a pipeline and a pipeline version")
}

// TODO(gkcalat): consider removing this after v2beta1 GA if we adopt storing PipelineSpec in DB.
// Store the pipeline file
err = r.objectStore.AddFile(tmpl.Bytes(), r.objectStore.GetPipelineKey(newVersion.UUID))
if err != nil {
return nil, nil, util.Wrap(err, "Failed to create a pipeline and a pipeline version due to error saving PipelineSpec to ObjectStore")
}

newPipeline.Status = model.PipelineReady
err = r.pipelineStore.UpdatePipelineStatus(
newPipeline.UUID,
Expand Down Expand Up @@ -1535,13 +1528,6 @@ func (r *ResourceManager) CreatePipelineVersion(pv *model.PipelineVersion) (*mod
return nil, util.Wrap(err, "Failed to create pipeline version in PipelineStore")
}

// TODO(gkcalat): consider removing this after v2beta1 GA if we adopt storing PipelineSpec in DB.
// Store the pipeline file
err = r.objectStore.AddFile(tmpl.Bytes(), r.objectStore.GetPipelineKey(fmt.Sprint(version.UUID)))
if err != nil {
return nil, util.Wrap(err, "Failed to create a pipeline version due to error saving PipelineSpec to ObjectStore")
}

// After pipeline version being created in DB and pipeline file being
// saved in minio server, set this pieline version to status ready.
version.Status = model.PipelineVersionReady
Expand Down Expand Up @@ -1589,7 +1575,7 @@ func (r *ResourceManager) ListPipelineVersions(pipelineId string, opts *list.Opt
// Deletes a pipeline version and the corresponding PipelineSpec.
func (r *ResourceManager) DeletePipelineVersion(pipelineVersionId string) error {
// Check if pipeline version exists
pipelineVersion, err := r.pipelineStore.GetPipelineVersion(pipelineVersionId)
_, err := r.pipelineStore.GetPipelineVersion(pipelineVersionId)
if err != nil {
return util.Wrapf(err, "Failed to delete pipeline version with id %v as it was not found", pipelineVersionId)
}
Expand All @@ -1610,41 +1596,7 @@ func (r *ResourceManager) DeletePipelineVersion(pipelineVersionId string) error
// either using async deletion in order for this method to be non-blocking
// or or exploring other performance optimization tools provided by gcs.
//
// TODO(gkcalat): consider removing this if we switch to storing PipelineSpec in DB.
// DeleteObject always responds with http '204' even for
// objects which do not exist. The err below will be nil.
//
// Delete based on pipeline spec URI
pipelineSpecRemoved := false
var osErr error
err = r.objectStore.DeleteFile(pipelineVersion.PipelineSpecURI)
if err != nil {
glog.Errorf("%v", util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v with URI %v", pipelineVersionId, pipelineVersion.PipelineSpecURI))
osErr = util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v with URI %v", pipelineVersionId, pipelineVersion.PipelineSpecURI)
} else {
pipelineSpecRemoved = true
}
// Delete based on pipeline version id
err = r.objectStore.DeleteFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersionId)))
if err != nil {
glog.Errorf("%v", util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v", pipelineVersionId))
err = util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v", pipelineVersionId)
osErr = util.Wrap(osErr, err.Error())
} else {
pipelineSpecRemoved = true
}
// Delete based on pipeline id
err = r.objectStore.DeleteFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
if err != nil {
glog.Errorf("%v", util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v using pipeline id %v", pipelineVersionId, pipelineVersion.PipelineId))
err = util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v using pipeline id %v", pipelineVersionId, pipelineVersion.PipelineId)
osErr = util.Wrap(osErr, err.Error())
} else {
pipelineSpecRemoved = true
}
if !pipelineSpecRemoved {
return util.Wrap(osErr, "Failed to delete a pipeline spec")
}

// Delete the DB entry
err = r.pipelineStore.DeletePipelineVersion(pipelineVersionId)
if err != nil {
Expand Down

0 comments on commit 20e1074

Please sign in to comment.