Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(run): not return minio error in list pipeline run #744

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 20 additions & 76 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ import (
"github.com/instill-ai/pipeline-backend/pkg/resource"
"github.com/instill-ai/pipeline-backend/pkg/utils"
"github.com/instill-ai/pipeline-backend/pkg/worker"

errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"

"github.com/instill-ai/x/errmsg"

componentbase "github.com/instill-ai/pipeline-backend/pkg/component/base"
errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"
mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
pipelinepb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)
Expand All @@ -65,8 +63,8 @@ func (s *service) GetHubStats(ctx context.Context) (*pipelinepb.GetHubStatsRespo
}

return &pipelinepb.GetHubStatsResponse{
NumberOfPublicPipelines: int32(hubStats.NumberOfPublicPipelines),
NumberOfFeaturedPipelines: int32(hubStats.NumberOfFeaturedPipelines),
NumberOfPublicPipelines: hubStats.NumberOfPublicPipelines,
NumberOfFeaturedPipelines: hubStats.NumberOfFeaturedPipelines,
}, nil
}

Expand Down Expand Up @@ -1792,7 +1790,6 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pipelinepb.ListPipe
fileContents, err := s.minioClient.GetFilesByPaths(ctx, referenceIDs)
if err != nil {
log.Error("failed to get files from minio", zap.Error(err))
return nil, err
}

metadataMap := make(map[string][]byte)
Expand Down Expand Up @@ -1826,60 +1823,21 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pipelinepb.ListPipe

if CanViewPrivateData(run.Namespace, requesterUID) {
if len(run.Inputs) == 1 {
md, ok := metadataMap[run.Inputs[0].Name]
if !ok {
return nil, fmt.Errorf("failed to load input metadata. pipeline UID: %s input reference ID: %s", run.PipelineUID.String(), run.Inputs[0].Name)
}
pbRun.Inputs = make([]*structpb.Struct, 0)
err = json.Unmarshal(md, &pbRun.Inputs)
if err != nil {
return nil, err
}

key := run.Inputs[0].Name
pbRun.Inputs = parseMetadataToStructArray(metadataMap, log, key, "input",
zap.String("pipelineUID", run.PipelineUID.String()), zap.String("inputReferenceID", key))
jvallesm marked this conversation as resolved.
Show resolved Hide resolved
}

if len(run.Outputs) == 1 {
md, ok := metadataMap[run.Outputs[0].Name]
if !ok {
return nil, fmt.Errorf("failed to load output metadata. pipeline UID: %s output reference ID: %s", run.PipelineUID.String(), run.Outputs[0].Name)
}
pbRun.Outputs = make([]*structpb.Struct, 0)
err = json.Unmarshal(md, &pbRun.Outputs)
if err != nil {
return nil, err
}
key := run.Outputs[0].Name
pbRun.Outputs = parseMetadataToStructArray(metadataMap, log, key, "output",
zap.String("pipelineUID", run.PipelineUID.String()), zap.String("outputReferenceID", key))
}
if len(run.RecipeSnapshot) == 1 {
md, ok := metadataMap[run.RecipeSnapshot[0].Name]
if !ok {
return nil, fmt.Errorf("failed to load recipe metadata. pipeline UID: %s recipe reference ID: %s", run.PipelineUID.String(), run.RecipeSnapshot[0].Name)
}
r := make(map[string]any)
err = json.Unmarshal(md, &r)
if err != nil {
return nil, fmt.Errorf("failed to load recipe metadata. pipeline UID: %s recipe reference ID: %s", run.PipelineUID.String(), run.RecipeSnapshot[0].Name)
}
pbRun.RecipeSnapshot, err = structpb.NewStruct(r)
if err != nil {
return nil, err
}

dbRecipe := &datamodel.Recipe{}
err = json.Unmarshal(md, dbRecipe)
if err != nil {
return nil, fmt.Errorf("failed to load recipe metadata. pipeline UID: %s recipe reference ID: %s", run.PipelineUID.String(), run.RecipeSnapshot[0].Name)
}

err = s.converter.IncludeDetailInRecipe(ctx, "", dbRecipe, false)
if err != nil {
return nil, fmt.Errorf("failed to load recipe metadata. pipeline UID: %s recipe reference ID: %s", run.PipelineUID.String(), run.RecipeSnapshot[0].Name)
}
pbRun.DataSpecification, err = s.converter.GeneratePipelineDataSpec(dbRecipe.Variable, dbRecipe.Output, dbRecipe.Component)
if err != nil {
// Some recipes cannot generate a DataSpecification, so we
// can skip them.
pbRun.DataSpecification = nil
}

if len(run.RecipeSnapshot) == 1 {
key := run.RecipeSnapshot[0].Name
pbRun.RecipeSnapshot, pbRun.DataSpecification = parseRecipeMetadata(metadataMap, log, s.converter, key, "recipe",
zap.String("pipelineUID", run.PipelineUID.String()), zap.String("recipeReferenceID", key))
}
}

Expand Down Expand Up @@ -1942,7 +1900,6 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pipelinepb.ListCom
fileContents, err := s.minioClient.GetFilesByPaths(ctx, referenceIDs)
if err != nil {
log.Error("failed to get files from minio", zap.Error(err))
return nil, err
}

metadataMap := make(map[string][]byte)
Expand All @@ -1960,27 +1917,14 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pipelinepb.ListCom

if CanViewPrivateData(dbPipelineRun.Namespace, requesterUID) {
if len(run.Inputs) == 1 {
md, ok := metadataMap[run.Inputs[0].Name]
if !ok {
return nil, fmt.Errorf("failed to load input metadata. component UID: %s input reference ID: %s", run.ComponentID, run.Inputs[0].Name)
}
pbRun.Inputs = make([]*structpb.Struct, 0)
err = json.Unmarshal(md, &pbRun.Inputs)
if err != nil {
return nil, err
}

key := run.Inputs[0].Name
pbRun.Inputs = parseMetadataToStructArray(metadataMap, log, key, "input",
zap.String("ComponentID", run.ComponentID), zap.String("inputReferenceID", key))
}
if len(run.Outputs) == 1 {
md, ok := metadataMap[run.Outputs[0].Name]
if !ok {
return nil, fmt.Errorf("failed to load output metadata. component UID: %s output reference ID: %s", run.ComponentID, run.Outputs[0].Name)
}
pbRun.Outputs = make([]*structpb.Struct, 0)
err = json.Unmarshal(md, &pbRun.Outputs)
if err != nil {
return nil, err
}
key := run.Outputs[0].Name
pbRun.Outputs = parseMetadataToStructArray(metadataMap, log, key, "output",
zap.String("ComponentID", run.ComponentID), zap.String("outputReferenceID", key))
}
}
pbComponentRuns[i] = pbRun
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/pipeline_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestService_ListPipelineRuns(t *testing.T) {
}, nil)

mockMinio := mock.NewMinioIMock(mc)
mockMinio.GetFilesByPathsMock.Return(nil, nil)
mockMinio.GetFilesByPathsMock.Return(nil, fmt.Errorf("some errors"))

for i, testCase := range testCases {
c.Run(fmt.Sprintf("get pipeline run with permissions test case %d %s", i+1, testCase.description), func(c *qt.C) {
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestService_ListPipelineRuns_OrgResource(t *testing.T) {
}, nil)

mockMinio := mock.NewMinioIMock(mc)
mockMinio.GetFilesByPathsMock.Return(nil, nil)
mockMinio.GetFilesByPathsMock.Return(nil, fmt.Errorf("some error happens"))

for i, testCase := range testCases {
c.Run(fmt.Sprintf("get pipeline run with permissions test case %d %s", i+1, testCase.description), func(c *qt.C) {
Expand Down
56 changes: 55 additions & 1 deletion pkg/service/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ package service

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"time"

"github.com/gofrs/uuid"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/instill-ai/pipeline-backend/pkg/constant"
"github.com/instill-ai/pipeline-backend/pkg/datamodel"
"github.com/instill-ai/pipeline-backend/pkg/resource"

errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"

runpb "github.com/instill-ai/protogen-go/common/run/v1alpha"
mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
pipelinepb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
Expand Down Expand Up @@ -159,3 +161,55 @@ func (s *service) convertComponentRunToPB(run datamodel.ComponentRun) (*pipeline
func CanViewPrivateData(namespace, requesterUID string) bool {
return namespace == requesterUID
}

func parseMetadataToStructArray(metadataMap map[string][]byte, log *zap.Logger, key string, metadataType string, logFields ...zap.Field) []*structpb.Struct {
md, ok := metadataMap[key]
if !ok {
log.Error(fmt.Sprintf("failed to load %s metadata", metadataType), logFields...)
jvallesm marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

structArr := make([]*structpb.Struct, 0)
if err := json.Unmarshal(md, &structArr); err != nil {
log.Error(fmt.Sprintf("failed to parse %s metadata", metadataType), logFields...)
jvallesm marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

return structArr
}

func parseRecipeMetadata(metadataMap map[string][]byte, log *zap.Logger, converter Converter, key string, metadataType string, logFields ...zap.Field) (*structpb.Struct, *pipelinepb.DataSpecification) {
md, ok := metadataMap[key]
if !ok {
log.Error(fmt.Sprintf("failed to load %s metadata", metadataType), logFields...)
jvallesm marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

r := make(map[string]any)
err := json.Unmarshal(md, &r)
if err != nil {
log.Error(fmt.Sprintf("failed to unmarshal %s metadata to map", metadataType), logFields...)
return nil, nil
}

pbStruct, err := structpb.NewStruct(r)
if err != nil {
log.Error(fmt.Sprintf("failed to convert %s metadata to struct", metadataType), logFields...)
return nil, nil
}

dbRecipe := &datamodel.Recipe{}
if err = json.Unmarshal(md, dbRecipe); err != nil {
log.Error(fmt.Sprintf("failed to unmarshal %s metadata to datamodel", metadataType), logFields...)
return nil, nil
}

if err = converter.IncludeDetailInRecipe(context.Background(), "", dbRecipe, false); err != nil {
log.Error("IncludeDetailInRecipe failed", logFields...)
return nil, nil
}

// Some recipes cannot generate a DataSpecification, so we can ignore the error.
dataSpec, _ := converter.GeneratePipelineDataSpec(dbRecipe.Variable, dbRecipe.Output, dbRecipe.Component)
return pbStruct, dataSpec
}
Loading