Skip to content

Commit

Permalink
feat(run): add requester id in list pipeline run response
Browse files Browse the repository at this point in the history
  • Loading branch information
joremysh committed Oct 23, 2024
1 parent b496f2a commit 147ee56
Show file tree
Hide file tree
Showing 7 changed files with 825 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/iFaceless/godub v0.0.0-20200728093528-a30bb4d1a0f1
github.com/iancoleman/strcase v0.3.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241012090311-e872dc0b511d
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241022025309-9afd9231a821
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.5.0-alpha
github.com/itchyny/gojq v0.12.14
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1275,8 +1275,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241012090311-e872dc0b511d h1:jf2RQtRFNxnPMkjTD0AAqXDXO8lHYOrWU3Hrr+yGEzY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241012090311-e872dc0b511d/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241022025309-9afd9231a821 h1:yDtTSAjeM7gkiHkL2XX4yil+VHOIm2kbyk0ZlHskRW8=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241022025309-9afd9231a821/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/x v0.5.0-alpha h1:xIeIvrLzwJYOBmZwOePVFVkKGEcMDqtHmn1cfVVNlIE=
Expand Down
781 changes: 779 additions & 2 deletions pkg/component/internal/mock/artifact_public_service_client_mock.gen.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions pkg/handler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,8 +1995,7 @@ func (h *PublicHandler) ListComponentRuns(ctx context.Context, req *pb.ListCompo
return resp, nil
}

// todo: rename function to ListPipelineRunsByRequester in protobuf and update message names here
func (h *PublicHandler) ListPipelineRunsByCreditOwner(ctx context.Context, req *pb.ListPipelineRunsByCreditOwnerRequest) (*pb.ListPipelineRunsByCreditOwnerResponse, error) {
func (h *PublicHandler) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListPipelineRunsByRequesterRequest) (*pb.ListPipelineRunsByRequesterResponse, error) {
logger, _ := logger.GetZapLogger(ctx)
logUUID, _ := uuid.NewV4()
logger.Info("ListPipelineRunsByRequester starts", zap.String("logUUID", logUUID.String()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Service interface {

ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRunsRequest, filter filtering.Filter) (*pb.ListPipelineRunsResponse, error)
ListComponentRuns(ctx context.Context, req *pb.ListComponentRunsRequest, filter filtering.Filter) (*pb.ListComponentRunsResponse, error)
ListPipelineRunsByRequester(ctx context.Context, req *pb.ListPipelineRunsByCreditOwnerRequest) (*pb.ListPipelineRunsByCreditOwnerResponse, error)
ListPipelineRunsByRequester(ctx context.Context, req *pb.ListPipelineRunsByRequesterRequest) (*pb.ListPipelineRunsByRequesterResponse, error)

GetIntegration(_ context.Context, id string, _ pb.View) (*pb.Integration, error)
ListIntegrations(context.Context, *pb.ListIntegrationsRequest) (*pb.ListIntegrationsResponse, error)
Expand Down
12 changes: 8 additions & 4 deletions pkg/service/pipeline_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,11 @@ func TestService_ListPipelineRuns(t *testing.T) {
runs, err := svc.ListPipelineRuns(ctxWithHeader, req, filtering.Filter{})
c.Assert(err, qt.IsNil)
if testCase.canView {
c.Check(len(runs.PipelineRuns), qt.Equals, 1)
c.Check(runs.PipelineRuns, qt.HasLen, 1)
c.Check(runs.PipelineRuns[0].RequesterId, qt.IsNotNil)
c.Check(*runs.PipelineRuns[0].RequesterId, qt.Equals, "test-user")
} else {
c.Check(len(runs.PipelineRuns), qt.Equals, 0)
c.Check(runs.PipelineRuns, qt.HasLen, 0)
}
})
}
Expand Down Expand Up @@ -472,9 +474,11 @@ func TestService_ListPipelineRuns_OrgResource(t *testing.T) {
runs, err := svc.ListPipelineRuns(ctxWithHeader, req, filtering.Filter{})
c.Assert(err, qt.IsNil)
if testCase.canView {
c.Check(len(runs.PipelineRuns), qt.Equals, 1)
c.Check(runs.PipelineRuns, qt.HasLen, 1)
c.Check(runs.PipelineRuns[0].RequesterId, qt.IsNotNil)
c.Check(*runs.PipelineRuns[0].RequesterId, qt.Equals, "test-user")
} else {
c.Check(len(runs.PipelineRuns), qt.Equals, 0)
c.Check(runs.PipelineRuns, qt.HasLen, 0)
}
})
}
Expand Down
49 changes: 33 additions & 16 deletions pkg/service/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,19 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns
metadataMap[content.Name] = content.Content
}

requesterIDMap := make(map[string]struct{})
userUIDMap := make(map[string]struct{})
for _, pipelineRun := range pipelineRuns {
requesterIDMap[pipelineRun.TriggeredBy] = struct{}{}
userUIDMap[pipelineRun.TriggeredBy] = struct{}{}
userUIDMap[pipelineRun.Namespace] = struct{}{}
}

runnerMap := make(map[string]*string)
for requesterID := range requesterIDMap {
userIDMap := make(map[string]*string)
for requesterID := range userUIDMap {
runner, err := s.mgmtPrivateServiceClient.CheckNamespaceByUIDAdmin(ctx, &mgmtpb.CheckNamespaceByUIDAdminRequest{Uid: requesterID})
if err != nil {
return nil, err
}
runnerMap[requesterID] = &runner.Id
userIDMap[requesterID] = &runner.Id
}

// Convert datamodel.PipelineRun to pb.PipelineRun
Expand All @@ -140,7 +141,10 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns
if err != nil {
return nil, fmt.Errorf("failed to convert pipeline run: %w", err)
}
pbRun.RunnerId = runnerMap[run.TriggeredBy]
pbRun.RunnerId = userIDMap[run.TriggeredBy]
if requesterID, ok := userIDMap[run.Namespace]; ok && requesterID != nil {
pbRun.RequesterId = *requesterID
}

if CanViewPrivateData(run.Namespace, requesterUID) {
if len(run.Inputs) == 1 {
Expand Down Expand Up @@ -273,10 +277,18 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu
}, nil
}

func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListPipelineRunsByCreditOwnerRequest) (*pb.ListPipelineRunsByCreditOwnerResponse, error) {
func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListPipelineRunsByRequesterRequest) (*pb.ListPipelineRunsByRequesterResponse, error) {
page := s.pageInRange(req.GetPage())
pageSize := s.pageSizeInRange(req.GetPageSize())
requesterUID, _ := utils.GetRequesterUIDAndUserUID(ctx)

ns, err := s.GetRscNamespace(ctx, req.GetRequesterId())
if err != nil {
return nil, fmt.Errorf("invalid namespace: %w", err)
}

if err := s.checkNamespacePermission(ctx, ns); err != nil {
return nil, fmt.Errorf("checking namespace permissions: %w", err)
}

declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{
filtering.DeclareStandardFunctions(),
Expand Down Expand Up @@ -312,7 +324,7 @@ func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListP
}

pipelineRuns, totalCount, err := s.repository.GetPaginatedPipelineRunsByRequester(ctx, repository.GetPipelineRunsByRequesterParams{
RequesterUID: requesterUID,
RequesterUID: ns.NsUID.String(),
StartTimeBegin: startedTimeBegin,
StartTimeEnd: startedTimeEnd,
Page: page,
Expand All @@ -324,18 +336,19 @@ func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListP
return nil, fmt.Errorf("getting pipeline runs by requester: %w", err)
}

requesterIDMap := make(map[string]struct{})
userUIDMap := make(map[string]struct{})
for _, pipelineRun := range pipelineRuns {
requesterIDMap[pipelineRun.TriggeredBy] = struct{}{}
userUIDMap[pipelineRun.TriggeredBy] = struct{}{}
userUIDMap[pipelineRun.Namespace] = struct{}{}
}

runnerMap := make(map[string]*string)
for requesterID := range requesterIDMap {
userIDMap := make(map[string]*string)
for requesterID := range userUIDMap {
runner, err := s.mgmtPrivateServiceClient.CheckNamespaceByUIDAdmin(ctx, &mgmtpb.CheckNamespaceByUIDAdminRequest{Uid: requesterID})
if err != nil {
return nil, err
}
runnerMap[requesterID] = &runner.Id
userIDMap[requesterID] = &runner.Id
}

pbPipelineRuns := make([]*pb.PipelineRun, len(pipelineRuns))
Expand All @@ -346,11 +359,15 @@ func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListP
if err != nil {
return nil, fmt.Errorf("converting pipeline run: %w", err)
}
pbRun.RunnerId = runnerMap[run.TriggeredBy]
pbRun.RunnerId = userIDMap[run.TriggeredBy]
if requesterID, ok := userIDMap[run.Namespace]; ok && requesterID != nil {
pbRun.RequesterId = *requesterID
}

pbPipelineRuns[i] = pbRun
}

return &pb.ListPipelineRunsByCreditOwnerResponse{
return &pb.ListPipelineRunsByRequesterResponse{
PipelineRuns: pbPipelineRuns,
TotalSize: int32(totalCount),
Page: int32(page),
Expand Down

0 comments on commit 147ee56

Please sign in to comment.