From e1e844b4f6db226a29ba2937f0f05005a5899d49 Mon Sep 17 00:00:00 2001 From: Jeremy Shih Date: Wed, 16 Oct 2024 17:06:03 +0800 Subject: [PATCH] feat(run): run logging data list by requester API (#730) Because - the data that users could see on dashboard are different from what they can see on Runs page in a single pipeline or model This commit - add new run logging API for dashboard ![image](https://github.com/user-attachments/assets/7ff10845-4b99-4a55-a08a-90652ee67454) --- go.mod | 2 +- go.sum | 4 +- pkg/datamodel/runlogging.go | 1 + pkg/handler/pipeline.go | 17 + pkg/mock/acl_client_interface_mock.gen.go | 2 + pkg/mock/converter_mock.gen.go | 3 + .../mgmt_private_service_client_mock.gen.go | 7 +- pkg/mock/minio_i_mock.gen.go | 5 +- pkg/mock/repository_mock.gen.go | 366 +++++++++++++++++- pkg/repository/repository.go | 72 +++- pkg/repository/repository_test.go | 134 +++++++ pkg/service/main.go | 1 + pkg/service/pipeline.go | 86 ++++ pkg/service/utils.go | 1 + 14 files changed, 688 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 787991a3d..0b7a22c5e 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/iFaceless/godub v0.0.0-20200728093528-a30bb4d1a0f1 github.com/iancoleman/strcase v0.3.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241007141417-545e5d187408 + github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241012090311-e872dc0b511d github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a github.com/instill-ai/x v0.5.0-alpha github.com/itchyny/gojq v0.12.14 diff --git a/go.sum b/go.sum index 27c17d112..a1ec7898e 100644 --- a/go.sum +++ b/go.sum @@ -1275,8 +1275,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241007141417-545e5d187408 h1:GTx0g6dXxd7WUm4bvJK1N+j8zuxIaSoXD5AZ2Vl+FXI= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241007141417-545e5d187408/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241012090311-e872dc0b511d h1:jf2RQtRFNxnPMkjTD0AAqXDXO8lHYOrWU3Hrr+yGEzY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241012090311-e872dc0b511d/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw= github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw= github.com/instill-ai/x v0.5.0-alpha h1:xIeIvrLzwJYOBmZwOePVFVkKGEcMDqtHmn1cfVVNlIE= diff --git a/pkg/datamodel/runlogging.go b/pkg/datamodel/runlogging.go index 645b165d3..6c4f38f4f 100644 --- a/pkg/datamodel/runlogging.go +++ b/pkg/datamodel/runlogging.go @@ -67,6 +67,7 @@ func (j *JSONB) Scan(value interface{}) error { type PipelineRun struct { PipelineTriggerUID uuid.UUID `gorm:"primaryKey" json:"pipeline-trigger-uid"` // Unique identifier for each run PipelineUID uuid.UUID `gorm:"type:uuid;index" json:"pipeline-uid"` // Pipeline unique ID used in the run + Pipeline Pipeline `gorm:"foreignKey:PipelineUID;references:UID"` // Pipeline instance referenced in the run PipelineVersion string `gorm:"type:varchar(255)" json:"pipeline-version"` // Pipeline version used in the run Status RunStatus `gorm:"type:valid_trigger_status;index" json:"status"` // Current status of the run (e.g., Running, Completed, Failed) Source RunSource `gorm:"type:valid_trigger_source" json:"source"` // Origin of the run (e.g., Web click, API) diff --git a/pkg/handler/pipeline.go b/pkg/handler/pipeline.go index 8106d82d8..8e084924d 100644 --- a/pkg/handler/pipeline.go +++ b/pkg/handler/pipeline.go @@ -1994,3 +1994,20 @@ func (h *PublicHandler) ListComponentRuns(ctx context.Context, req *pb.ListCompo return resp, nil } + +// todo: rename function to ListPipelineRunsByRequester in protobuf and update message names here +func (h *PublicHandler) ListPipelineRunsByCreditOwner(ctx context.Context, req *pb.ListPipelineRunsByCreditOwnerRequest) (*pb.ListPipelineRunsByCreditOwnerResponse, error) { + logger, _ := logger.GetZapLogger(ctx) + logUUID, _ := uuid.NewV4() + logger.Info("ListPipelineRunsByRequester starts", zap.String("logUUID", logUUID.String())) + + resp, err := h.service.ListPipelineRunsByRequester(ctx, req) + if err != nil { + logger.Error("failed in ListPipelineRunsByRequester", zap.String("logUUID", logUUID.String()), zap.Error(err)) + return nil, status.Error(codes.Internal, "Failed to list pipeline runs") + } + + logger.Info("ListPipelineRunsByRequester finished", zap.String("logUUID", logUUID.String())) + + return resp, nil +} diff --git a/pkg/mock/acl_client_interface_mock.gen.go b/pkg/mock/acl_client_interface_mock.gen.go index 2cbced371..a11dacc4b 100644 --- a/pkg/mock/acl_client_interface_mock.gen.go +++ b/pkg/mock/acl_client_interface_mock.gen.go @@ -5,11 +5,13 @@ package mock import ( "context" "sync" + mm_atomic "sync/atomic" mm_time "time" "github.com/gofrs/uuid" "github.com/gojuno/minimock/v3" + "github.com/instill-ai/pipeline-backend/pkg/datamodel" ) diff --git a/pkg/mock/converter_mock.gen.go b/pkg/mock/converter_mock.gen.go index bdcaae433..bc5267f18 100644 --- a/pkg/mock/converter_mock.gen.go +++ b/pkg/mock/converter_mock.gen.go @@ -5,13 +5,16 @@ package mock import ( "context" "sync" + mm_atomic "sync/atomic" mm_time "time" "github.com/gofrs/uuid" "github.com/gojuno/minimock/v3" + "github.com/instill-ai/pipeline-backend/pkg/datamodel" "github.com/instill-ai/pipeline-backend/pkg/resource" + pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" ) diff --git a/pkg/mock/mgmt_private_service_client_mock.gen.go b/pkg/mock/mgmt_private_service_client_mock.gen.go index a3a7b010c..ea7f9bd36 100644 --- a/pkg/mock/mgmt_private_service_client_mock.gen.go +++ b/pkg/mock/mgmt_private_service_client_mock.gen.go @@ -3,14 +3,17 @@ package mock import ( - context "context" "sync" + + context "context" mm_atomic "sync/atomic" mm_time "time" "github.com/gojuno/minimock/v3" - mm_mgmtv1beta "github.com/instill-ai/protogen-go/core/mgmt/v1beta" + grpc "google.golang.org/grpc" + + mm_mgmtv1beta "github.com/instill-ai/protogen-go/core/mgmt/v1beta" ) // MgmtPrivateServiceClientMock implements mm_mgmtv1beta.MgmtPrivateServiceClient diff --git a/pkg/mock/minio_i_mock.gen.go b/pkg/mock/minio_i_mock.gen.go index 2d646cbff..8692731de 100644 --- a/pkg/mock/minio_i_mock.gen.go +++ b/pkg/mock/minio_i_mock.gen.go @@ -5,12 +5,15 @@ package mock import ( "context" "sync" + mm_atomic "sync/atomic" mm_time "time" "github.com/gojuno/minimock/v3" - mm_minio "github.com/instill-ai/pipeline-backend/pkg/minio" + minio "github.com/minio/minio-go/v7" + + mm_minio "github.com/instill-ai/pipeline-backend/pkg/minio" ) // MinioIMock implements mm_minio.MinioI diff --git a/pkg/mock/repository_mock.gen.go b/pkg/mock/repository_mock.gen.go index e086d1a3c..1efa23c82 100644 --- a/pkg/mock/repository_mock.gen.go +++ b/pkg/mock/repository_mock.gen.go @@ -5,18 +5,21 @@ package mock import ( "context" "sync" + mm_atomic "sync/atomic" mm_time "time" "github.com/gofrs/uuid" "github.com/gojuno/minimock/v3" - "github.com/instill-ai/pipeline-backend/pkg/datamodel" - mm_repository "github.com/instill-ai/pipeline-backend/pkg/repository" - pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" "go.einride.tech/aip/filtering" "go.einride.tech/aip/ordering" "gorm.io/gorm" "gorm.io/gorm/clause" + + "github.com/instill-ai/pipeline-backend/pkg/datamodel" + + mm_repository "github.com/instill-ai/pipeline-backend/pkg/repository" + pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" ) // RepositoryMock implements mm_repository.Repository @@ -171,6 +174,13 @@ type RepositoryMock struct { beforeGetPaginatedComponentRunsByPipelineRunIDWithPermissionsCounter uint64 GetPaginatedComponentRunsByPipelineRunIDWithPermissionsMock mRepositoryMockGetPaginatedComponentRunsByPipelineRunIDWithPermissions + funcGetPaginatedPipelineRunsByRequester func(ctx context.Context, params mm_repository.GetPipelineRunsByRequesterParams) (pa1 []datamodel.PipelineRun, i1 int64, err error) + funcGetPaginatedPipelineRunsByRequesterOrigin string + inspectFuncGetPaginatedPipelineRunsByRequester func(ctx context.Context, params mm_repository.GetPipelineRunsByRequesterParams) + afterGetPaginatedPipelineRunsByRequesterCounter uint64 + beforeGetPaginatedPipelineRunsByRequesterCounter uint64 + GetPaginatedPipelineRunsByRequesterMock mRepositoryMockGetPaginatedPipelineRunsByRequester + funcGetPaginatedPipelineRunsWithPermissions func(ctx context.Context, requesterUID string, pipelineUID string, page int, pageSize int, filter filtering.Filter, order ordering.OrderBy, isOwner bool) (pa1 []datamodel.PipelineRun, i1 int64, err error) funcGetPaginatedPipelineRunsWithPermissionsOrigin string inspectFuncGetPaginatedPipelineRunsWithPermissions func(ctx context.Context, requesterUID string, pipelineUID string, page int, pageSize int, filter filtering.Filter, order ordering.OrderBy, isOwner bool) @@ -446,6 +456,9 @@ func NewRepositoryMock(t minimock.Tester) *RepositoryMock { m.GetPaginatedComponentRunsByPipelineRunIDWithPermissionsMock = mRepositoryMockGetPaginatedComponentRunsByPipelineRunIDWithPermissions{mock: m} m.GetPaginatedComponentRunsByPipelineRunIDWithPermissionsMock.callArgs = []*RepositoryMockGetPaginatedComponentRunsByPipelineRunIDWithPermissionsParams{} + m.GetPaginatedPipelineRunsByRequesterMock = mRepositoryMockGetPaginatedPipelineRunsByRequester{mock: m} + m.GetPaginatedPipelineRunsByRequesterMock.callArgs = []*RepositoryMockGetPaginatedPipelineRunsByRequesterParams{} + m.GetPaginatedPipelineRunsWithPermissionsMock = mRepositoryMockGetPaginatedPipelineRunsWithPermissions{mock: m} m.GetPaginatedPipelineRunsWithPermissionsMock.callArgs = []*RepositoryMockGetPaginatedPipelineRunsWithPermissionsParams{} @@ -8474,6 +8487,350 @@ func (m *RepositoryMock) MinimockGetPaginatedComponentRunsByPipelineRunIDWithPer } } +type mRepositoryMockGetPaginatedPipelineRunsByRequester struct { + optional bool + mock *RepositoryMock + defaultExpectation *RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation + expectations []*RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation + + callArgs []*RepositoryMockGetPaginatedPipelineRunsByRequesterParams + mutex sync.RWMutex + + expectedInvocations uint64 + expectedInvocationsOrigin string +} + +// RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation specifies expectation struct of the Repository.GetPaginatedPipelineRunsByRequester +type RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation struct { + mock *RepositoryMock + params *RepositoryMockGetPaginatedPipelineRunsByRequesterParams + paramPtrs *RepositoryMockGetPaginatedPipelineRunsByRequesterParamPtrs + expectationOrigins RepositoryMockGetPaginatedPipelineRunsByRequesterExpectationOrigins + results *RepositoryMockGetPaginatedPipelineRunsByRequesterResults + returnOrigin string + Counter uint64 +} + +// RepositoryMockGetPaginatedPipelineRunsByRequesterParams contains parameters of the Repository.GetPaginatedPipelineRunsByRequester +type RepositoryMockGetPaginatedPipelineRunsByRequesterParams struct { + ctx context.Context + params mm_repository.GetPipelineRunsByRequesterParams +} + +// RepositoryMockGetPaginatedPipelineRunsByRequesterParamPtrs contains pointers to parameters of the Repository.GetPaginatedPipelineRunsByRequester +type RepositoryMockGetPaginatedPipelineRunsByRequesterParamPtrs struct { + ctx *context.Context + params *mm_repository.GetPipelineRunsByRequesterParams +} + +// RepositoryMockGetPaginatedPipelineRunsByRequesterResults contains results of the Repository.GetPaginatedPipelineRunsByRequester +type RepositoryMockGetPaginatedPipelineRunsByRequesterResults struct { + pa1 []datamodel.PipelineRun + i1 int64 + err error +} + +// RepositoryMockGetPaginatedPipelineRunsByRequesterOrigins contains origins of expectations of the Repository.GetPaginatedPipelineRunsByRequester +type RepositoryMockGetPaginatedPipelineRunsByRequesterExpectationOrigins struct { + origin string + originCtx string + originParams string +} + +// Marks this method to be optional. The default behavior of any method with Return() is '1 or more', meaning +// the test will fail minimock's automatic final call check if the mocked method was not called at least once. +// Optional() makes method check to work in '0 or more' mode. +// It is NOT RECOMMENDED to use this option unless you really need it, as default behaviour helps to +// catch the problems when the expected method call is totally skipped during test run. +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) Optional() *mRepositoryMockGetPaginatedPipelineRunsByRequester { + mmGetPaginatedPipelineRunsByRequester.optional = true + return mmGetPaginatedPipelineRunsByRequester +} + +// Expect sets up expected params for Repository.GetPaginatedPipelineRunsByRequester +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) Expect(ctx context.Context, params mm_repository.GetPipelineRunsByRequesterParams) *mRepositoryMockGetPaginatedPipelineRunsByRequester { + if mmGetPaginatedPipelineRunsByRequester.mock.funcGetPaginatedPipelineRunsByRequester != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("RepositoryMock.GetPaginatedPipelineRunsByRequester mock is already set by Set") + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation == nil { + mmGetPaginatedPipelineRunsByRequester.defaultExpectation = &RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation{} + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation.paramPtrs != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("RepositoryMock.GetPaginatedPipelineRunsByRequester mock is already set by ExpectParams functions") + } + + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.params = &RepositoryMockGetPaginatedPipelineRunsByRequesterParams{ctx, params} + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.expectationOrigins.origin = minimock.CallerInfo(1) + for _, e := range mmGetPaginatedPipelineRunsByRequester.expectations { + if minimock.Equal(e.params, mmGetPaginatedPipelineRunsByRequester.defaultExpectation.params) { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("Expectation set by When has same params: %#v", *mmGetPaginatedPipelineRunsByRequester.defaultExpectation.params) + } + } + + return mmGetPaginatedPipelineRunsByRequester +} + +// ExpectCtxParam1 sets up expected param ctx for Repository.GetPaginatedPipelineRunsByRequester +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) ExpectCtxParam1(ctx context.Context) *mRepositoryMockGetPaginatedPipelineRunsByRequester { + if mmGetPaginatedPipelineRunsByRequester.mock.funcGetPaginatedPipelineRunsByRequester != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("RepositoryMock.GetPaginatedPipelineRunsByRequester mock is already set by Set") + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation == nil { + mmGetPaginatedPipelineRunsByRequester.defaultExpectation = &RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation{} + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation.params != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("RepositoryMock.GetPaginatedPipelineRunsByRequester mock is already set by Expect") + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation.paramPtrs == nil { + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.paramPtrs = &RepositoryMockGetPaginatedPipelineRunsByRequesterParamPtrs{} + } + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.paramPtrs.ctx = &ctx + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.expectationOrigins.originCtx = minimock.CallerInfo(1) + + return mmGetPaginatedPipelineRunsByRequester +} + +// ExpectParamsParam2 sets up expected param params for Repository.GetPaginatedPipelineRunsByRequester +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) ExpectParamsParam2(params mm_repository.GetPipelineRunsByRequesterParams) *mRepositoryMockGetPaginatedPipelineRunsByRequester { + if mmGetPaginatedPipelineRunsByRequester.mock.funcGetPaginatedPipelineRunsByRequester != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("RepositoryMock.GetPaginatedPipelineRunsByRequester mock is already set by Set") + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation == nil { + mmGetPaginatedPipelineRunsByRequester.defaultExpectation = &RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation{} + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation.params != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("RepositoryMock.GetPaginatedPipelineRunsByRequester mock is already set by Expect") + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation.paramPtrs == nil { + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.paramPtrs = &RepositoryMockGetPaginatedPipelineRunsByRequesterParamPtrs{} + } + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.paramPtrs.params = ¶ms + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.expectationOrigins.originParams = minimock.CallerInfo(1) + + return mmGetPaginatedPipelineRunsByRequester +} + +// Inspect accepts an inspector function that has same arguments as the Repository.GetPaginatedPipelineRunsByRequester +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) Inspect(f func(ctx context.Context, params mm_repository.GetPipelineRunsByRequesterParams)) *mRepositoryMockGetPaginatedPipelineRunsByRequester { + if mmGetPaginatedPipelineRunsByRequester.mock.inspectFuncGetPaginatedPipelineRunsByRequester != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("Inspect function is already set for RepositoryMock.GetPaginatedPipelineRunsByRequester") + } + + mmGetPaginatedPipelineRunsByRequester.mock.inspectFuncGetPaginatedPipelineRunsByRequester = f + + return mmGetPaginatedPipelineRunsByRequester +} + +// Return sets up results that will be returned by Repository.GetPaginatedPipelineRunsByRequester +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) Return(pa1 []datamodel.PipelineRun, i1 int64, err error) *RepositoryMock { + if mmGetPaginatedPipelineRunsByRequester.mock.funcGetPaginatedPipelineRunsByRequester != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("RepositoryMock.GetPaginatedPipelineRunsByRequester mock is already set by Set") + } + + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation == nil { + mmGetPaginatedPipelineRunsByRequester.defaultExpectation = &RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation{mock: mmGetPaginatedPipelineRunsByRequester.mock} + } + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.results = &RepositoryMockGetPaginatedPipelineRunsByRequesterResults{pa1, i1, err} + mmGetPaginatedPipelineRunsByRequester.defaultExpectation.returnOrigin = minimock.CallerInfo(1) + return mmGetPaginatedPipelineRunsByRequester.mock +} + +// Set uses given function f to mock the Repository.GetPaginatedPipelineRunsByRequester method +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) Set(f func(ctx context.Context, params mm_repository.GetPipelineRunsByRequesterParams) (pa1 []datamodel.PipelineRun, i1 int64, err error)) *RepositoryMock { + if mmGetPaginatedPipelineRunsByRequester.defaultExpectation != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("Default expectation is already set for the Repository.GetPaginatedPipelineRunsByRequester method") + } + + if len(mmGetPaginatedPipelineRunsByRequester.expectations) > 0 { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("Some expectations are already set for the Repository.GetPaginatedPipelineRunsByRequester method") + } + + mmGetPaginatedPipelineRunsByRequester.mock.funcGetPaginatedPipelineRunsByRequester = f + mmGetPaginatedPipelineRunsByRequester.mock.funcGetPaginatedPipelineRunsByRequesterOrigin = minimock.CallerInfo(1) + return mmGetPaginatedPipelineRunsByRequester.mock +} + +// When sets expectation for the Repository.GetPaginatedPipelineRunsByRequester which will trigger the result defined by the following +// Then helper +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) When(ctx context.Context, params mm_repository.GetPipelineRunsByRequesterParams) *RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation { + if mmGetPaginatedPipelineRunsByRequester.mock.funcGetPaginatedPipelineRunsByRequester != nil { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("RepositoryMock.GetPaginatedPipelineRunsByRequester mock is already set by Set") + } + + expectation := &RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation{ + mock: mmGetPaginatedPipelineRunsByRequester.mock, + params: &RepositoryMockGetPaginatedPipelineRunsByRequesterParams{ctx, params}, + expectationOrigins: RepositoryMockGetPaginatedPipelineRunsByRequesterExpectationOrigins{origin: minimock.CallerInfo(1)}, + } + mmGetPaginatedPipelineRunsByRequester.expectations = append(mmGetPaginatedPipelineRunsByRequester.expectations, expectation) + return expectation +} + +// Then sets up Repository.GetPaginatedPipelineRunsByRequester return parameters for the expectation previously defined by the When method +func (e *RepositoryMockGetPaginatedPipelineRunsByRequesterExpectation) Then(pa1 []datamodel.PipelineRun, i1 int64, err error) *RepositoryMock { + e.results = &RepositoryMockGetPaginatedPipelineRunsByRequesterResults{pa1, i1, err} + return e.mock +} + +// Times sets number of times Repository.GetPaginatedPipelineRunsByRequester should be invoked +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) Times(n uint64) *mRepositoryMockGetPaginatedPipelineRunsByRequester { + if n == 0 { + mmGetPaginatedPipelineRunsByRequester.mock.t.Fatalf("Times of RepositoryMock.GetPaginatedPipelineRunsByRequester mock can not be zero") + } + mm_atomic.StoreUint64(&mmGetPaginatedPipelineRunsByRequester.expectedInvocations, n) + mmGetPaginatedPipelineRunsByRequester.expectedInvocationsOrigin = minimock.CallerInfo(1) + return mmGetPaginatedPipelineRunsByRequester +} + +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) invocationsDone() bool { + if len(mmGetPaginatedPipelineRunsByRequester.expectations) == 0 && mmGetPaginatedPipelineRunsByRequester.defaultExpectation == nil && mmGetPaginatedPipelineRunsByRequester.mock.funcGetPaginatedPipelineRunsByRequester == nil { + return true + } + + totalInvocations := mm_atomic.LoadUint64(&mmGetPaginatedPipelineRunsByRequester.mock.afterGetPaginatedPipelineRunsByRequesterCounter) + expectedInvocations := mm_atomic.LoadUint64(&mmGetPaginatedPipelineRunsByRequester.expectedInvocations) + + return totalInvocations > 0 && (expectedInvocations == 0 || expectedInvocations == totalInvocations) +} + +// GetPaginatedPipelineRunsByRequester implements mm_repository.Repository +func (mmGetPaginatedPipelineRunsByRequester *RepositoryMock) GetPaginatedPipelineRunsByRequester(ctx context.Context, params mm_repository.GetPipelineRunsByRequesterParams) (pa1 []datamodel.PipelineRun, i1 int64, err error) { + mm_atomic.AddUint64(&mmGetPaginatedPipelineRunsByRequester.beforeGetPaginatedPipelineRunsByRequesterCounter, 1) + defer mm_atomic.AddUint64(&mmGetPaginatedPipelineRunsByRequester.afterGetPaginatedPipelineRunsByRequesterCounter, 1) + + mmGetPaginatedPipelineRunsByRequester.t.Helper() + + if mmGetPaginatedPipelineRunsByRequester.inspectFuncGetPaginatedPipelineRunsByRequester != nil { + mmGetPaginatedPipelineRunsByRequester.inspectFuncGetPaginatedPipelineRunsByRequester(ctx, params) + } + + mm_params := RepositoryMockGetPaginatedPipelineRunsByRequesterParams{ctx, params} + + // Record call args + mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.mutex.Lock() + mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.callArgs = append(mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.callArgs, &mm_params) + mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.mutex.Unlock() + + for _, e := range mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.expectations { + if minimock.Equal(*e.params, mm_params) { + mm_atomic.AddUint64(&e.Counter, 1) + return e.results.pa1, e.results.i1, e.results.err + } + } + + if mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation != nil { + mm_atomic.AddUint64(&mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.Counter, 1) + mm_want := mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.params + mm_want_ptrs := mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.paramPtrs + + mm_got := RepositoryMockGetPaginatedPipelineRunsByRequesterParams{ctx, params} + + if mm_want_ptrs != nil { + + if mm_want_ptrs.ctx != nil && !minimock.Equal(*mm_want_ptrs.ctx, mm_got.ctx) { + mmGetPaginatedPipelineRunsByRequester.t.Errorf("RepositoryMock.GetPaginatedPipelineRunsByRequester got unexpected parameter ctx, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.expectationOrigins.originCtx, *mm_want_ptrs.ctx, mm_got.ctx, minimock.Diff(*mm_want_ptrs.ctx, mm_got.ctx)) + } + + if mm_want_ptrs.params != nil && !minimock.Equal(*mm_want_ptrs.params, mm_got.params) { + mmGetPaginatedPipelineRunsByRequester.t.Errorf("RepositoryMock.GetPaginatedPipelineRunsByRequester got unexpected parameter params, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.expectationOrigins.originParams, *mm_want_ptrs.params, mm_got.params, minimock.Diff(*mm_want_ptrs.params, mm_got.params)) + } + + } else if mm_want != nil && !minimock.Equal(*mm_want, mm_got) { + mmGetPaginatedPipelineRunsByRequester.t.Errorf("RepositoryMock.GetPaginatedPipelineRunsByRequester got unexpected parameters, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.expectationOrigins.origin, *mm_want, mm_got, minimock.Diff(*mm_want, mm_got)) + } + + mm_results := mmGetPaginatedPipelineRunsByRequester.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.results + if mm_results == nil { + mmGetPaginatedPipelineRunsByRequester.t.Fatal("No results are set for the RepositoryMock.GetPaginatedPipelineRunsByRequester") + } + return (*mm_results).pa1, (*mm_results).i1, (*mm_results).err + } + if mmGetPaginatedPipelineRunsByRequester.funcGetPaginatedPipelineRunsByRequester != nil { + return mmGetPaginatedPipelineRunsByRequester.funcGetPaginatedPipelineRunsByRequester(ctx, params) + } + mmGetPaginatedPipelineRunsByRequester.t.Fatalf("Unexpected call to RepositoryMock.GetPaginatedPipelineRunsByRequester. %v %v", ctx, params) + return +} + +// GetPaginatedPipelineRunsByRequesterAfterCounter returns a count of finished RepositoryMock.GetPaginatedPipelineRunsByRequester invocations +func (mmGetPaginatedPipelineRunsByRequester *RepositoryMock) GetPaginatedPipelineRunsByRequesterAfterCounter() uint64 { + return mm_atomic.LoadUint64(&mmGetPaginatedPipelineRunsByRequester.afterGetPaginatedPipelineRunsByRequesterCounter) +} + +// GetPaginatedPipelineRunsByRequesterBeforeCounter returns a count of RepositoryMock.GetPaginatedPipelineRunsByRequester invocations +func (mmGetPaginatedPipelineRunsByRequester *RepositoryMock) GetPaginatedPipelineRunsByRequesterBeforeCounter() uint64 { + return mm_atomic.LoadUint64(&mmGetPaginatedPipelineRunsByRequester.beforeGetPaginatedPipelineRunsByRequesterCounter) +} + +// Calls returns a list of arguments used in each call to RepositoryMock.GetPaginatedPipelineRunsByRequester. +// The list is in the same order as the calls were made (i.e. recent calls have a higher index) +func (mmGetPaginatedPipelineRunsByRequester *mRepositoryMockGetPaginatedPipelineRunsByRequester) Calls() []*RepositoryMockGetPaginatedPipelineRunsByRequesterParams { + mmGetPaginatedPipelineRunsByRequester.mutex.RLock() + + argCopy := make([]*RepositoryMockGetPaginatedPipelineRunsByRequesterParams, len(mmGetPaginatedPipelineRunsByRequester.callArgs)) + copy(argCopy, mmGetPaginatedPipelineRunsByRequester.callArgs) + + mmGetPaginatedPipelineRunsByRequester.mutex.RUnlock() + + return argCopy +} + +// MinimockGetPaginatedPipelineRunsByRequesterDone returns true if the count of the GetPaginatedPipelineRunsByRequester invocations corresponds +// the number of defined expectations +func (m *RepositoryMock) MinimockGetPaginatedPipelineRunsByRequesterDone() bool { + if m.GetPaginatedPipelineRunsByRequesterMock.optional { + // Optional methods provide '0 or more' call count restriction. + return true + } + + for _, e := range m.GetPaginatedPipelineRunsByRequesterMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + return false + } + } + + return m.GetPaginatedPipelineRunsByRequesterMock.invocationsDone() +} + +// MinimockGetPaginatedPipelineRunsByRequesterInspect logs each unmet expectation +func (m *RepositoryMock) MinimockGetPaginatedPipelineRunsByRequesterInspect() { + for _, e := range m.GetPaginatedPipelineRunsByRequesterMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + m.t.Errorf("Expected call to RepositoryMock.GetPaginatedPipelineRunsByRequester at\n%s with params: %#v", e.expectationOrigins.origin, *e.params) + } + } + + afterGetPaginatedPipelineRunsByRequesterCounter := mm_atomic.LoadUint64(&m.afterGetPaginatedPipelineRunsByRequesterCounter) + // if default expectation was set then invocations count should be greater than zero + if m.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation != nil && afterGetPaginatedPipelineRunsByRequesterCounter < 1 { + if m.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.params == nil { + m.t.Errorf("Expected call to RepositoryMock.GetPaginatedPipelineRunsByRequester at\n%s", m.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.returnOrigin) + } else { + m.t.Errorf("Expected call to RepositoryMock.GetPaginatedPipelineRunsByRequester at\n%s with params: %#v", m.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.expectationOrigins.origin, *m.GetPaginatedPipelineRunsByRequesterMock.defaultExpectation.params) + } + } + // if func was set then invocations count should be greater than zero + if m.funcGetPaginatedPipelineRunsByRequester != nil && afterGetPaginatedPipelineRunsByRequesterCounter < 1 { + m.t.Errorf("Expected call to RepositoryMock.GetPaginatedPipelineRunsByRequester at\n%s", m.funcGetPaginatedPipelineRunsByRequesterOrigin) + } + + if !m.GetPaginatedPipelineRunsByRequesterMock.invocationsDone() && afterGetPaginatedPipelineRunsByRequesterCounter > 0 { + m.t.Errorf("Expected %d calls to RepositoryMock.GetPaginatedPipelineRunsByRequester at\n%s but found %d calls", + mm_atomic.LoadUint64(&m.GetPaginatedPipelineRunsByRequesterMock.expectedInvocations), m.GetPaginatedPipelineRunsByRequesterMock.expectedInvocationsOrigin, afterGetPaginatedPipelineRunsByRequesterCounter) + } +} + type mRepositoryMockGetPaginatedPipelineRunsWithPermissions struct { optional bool mock *RepositoryMock @@ -20237,6 +20594,8 @@ func (m *RepositoryMock) MinimockFinish() { m.MinimockGetPaginatedComponentRunsByPipelineRunIDWithPermissionsInspect() + m.MinimockGetPaginatedPipelineRunsByRequesterInspect() + m.MinimockGetPaginatedPipelineRunsWithPermissionsInspect() m.MinimockGetPipelineByIDAdminInspect() @@ -20338,6 +20697,7 @@ func (m *RepositoryMock) minimockDone() bool { m.MinimockGetNamespacePipelineReleaseByIDDone() && m.MinimockGetNamespaceSecretByIDDone() && m.MinimockGetPaginatedComponentRunsByPipelineRunIDWithPermissionsDone() && + m.MinimockGetPaginatedPipelineRunsByRequesterDone() && m.MinimockGetPaginatedPipelineRunsWithPermissionsDone() && m.MinimockGetPipelineByIDAdminDone() && m.MinimockGetPipelineByUIDDone() && diff --git a/pkg/repository/repository.go b/pkg/repository/repository.go index db615b480..1a1776d17 100644 --- a/pkg/repository/repository.go +++ b/pkg/repository/repository.go @@ -24,11 +24,9 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/datamodel" "github.com/instill-ai/pipeline-backend/pkg/logger" "github.com/instill-ai/pipeline-backend/pkg/resource" - - errdomain "github.com/instill-ai/pipeline-backend/pkg/errors" - "github.com/instill-ai/x/paginate" + errdomain "github.com/instill-ai/pipeline-backend/pkg/errors" pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" ) @@ -106,6 +104,7 @@ type Repository interface { GetPaginatedPipelineRunsWithPermissions(ctx context.Context, requesterUID, pipelineUID string, page, pageSize int, filter filtering.Filter, order ordering.OrderBy, isOwner bool) ([]datamodel.PipelineRun, int64, error) GetPaginatedComponentRunsByPipelineRunIDWithPermissions(ctx context.Context, pipelineRunID string, page, pageSize int, filter filtering.Filter, order ordering.OrderBy) ([]datamodel.ComponentRun, int64, error) + GetPaginatedPipelineRunsByRequester(ctx context.Context, params GetPipelineRunsByRequesterParams) ([]datamodel.PipelineRun, int64, error) } type repository struct { @@ -1130,7 +1129,7 @@ func (r *repository) AddPipelineClones(ctx context.Context, pipelineUID uuid.UUI func (r *repository) GetPipelineRunByUID(ctx context.Context, pipelineTriggerUID uuid.UUID) (*datamodel.PipelineRun, error) { pipelineRun := &datamodel.PipelineRun{PipelineTriggerUID: pipelineTriggerUID} - err := r.db.First(pipelineRun).Error + err := r.db.Preload(clause.Associations).First(pipelineRun).Error if err != nil { return nil, err } @@ -1271,6 +1270,71 @@ func (r *repository) GetPaginatedComponentRunsByPipelineRunIDWithPermissions(ctx return componentRuns, totalRows, nil } +type GetPipelineRunsByRequesterParams struct { + RequesterUID string + StartTimeBegin time.Time + StartTimeEnd time.Time + Page int + PageSize int + Filter filtering.Filter + Order ordering.OrderBy +} + +func (r *repository) GetPaginatedPipelineRunsByRequester(ctx context.Context, params GetPipelineRunsByRequesterParams) ([]datamodel.PipelineRun, int64, error) { + var pipelineRuns []datamodel.PipelineRun + var totalRows int64 + + whereConditions := []string{"namespace = ? and started_time >= ? and started_time <= ?"} + whereArgs := []any{params.RequesterUID, params.StartTimeBegin, params.StartTimeEnd} + + var expr *clause.Expr + var err error + if expr, err = r.TranspileFilter(params.Filter); err != nil { + return nil, 0, err + } + if expr != nil { + whereConditions = append(whereConditions, "(?)") + whereArgs = append(whereArgs, expr) + } + + var where string + if len(whereConditions) > 0 { + where = strings.Join(whereConditions, " and ") + } + + err = r.db.Model(&datamodel.PipelineRun{}). + Where(where, whereArgs...). + Count(&totalRows).Error + if err != nil { + return nil, 0, err + } + + queryBuilder := r.db.Preload(clause.Associations).Where(where, whereArgs...) + + order := params.Order + if len(order.Fields) == 0 { + order.Fields = append(order.Fields, ordering.Field{ + Path: "started_time", + Desc: true, + }) + } + + for _, field := range order.Fields { + orderString := strcase.ToSnake(field.Path) + transformBoolToDescString(field.Desc) + queryBuilder.Order(orderString) + } + + // Retrieve paginated results with permissions + err = queryBuilder. + Offset(params.Page * params.PageSize).Limit(params.PageSize). + Find(&pipelineRuns).Error + if err != nil { + return nil, 0, err + } + + return pipelineRuns, totalRows, nil +} + func (r *repository) CreateNamespaceConnection(ctx context.Context, conn *datamodel.Connection) (*datamodel.Connection, error) { db := r.db.WithContext(ctx) diff --git a/pkg/repository/repository_test.go b/pkg/repository/repository_test.go index 1da610a46..e61c34db1 100644 --- a/pkg/repository/repository_test.go +++ b/pkg/repository/repository_test.go @@ -480,6 +480,7 @@ func TestRepository_UpsertPipelineRun(t *testing.T) { c.Check(got1.PipelineUID, qt.Equals, p.UID) c.Check(got1.Status, qt.Equals, pipelineRun.Status) c.Check(got1.Source, qt.Equals, pipelineRun.Source) + c.Check(got1.Pipeline.UID, qt.Equals, p.UID) componentRun := &datamodel.ComponentRun{ PipelineTriggerUID: pipelineRun.PipelineTriggerUID, @@ -616,5 +617,138 @@ func TestRepository_GetPaginatedPipelineRunsWithPermissions(t *testing.T) { } }) } +} + +func TestRepository_GetPaginatedPipelineRunsByCreditOwner(t *testing.T) { + c := qt.New(t) + ctx := context.Background() + + cache, _ := redismock.NewClientMock() + + t0 := time.Now().UTC() + + mockUIDs := make([]uuid.UUID, 5) + for i := range len(mockUIDs) { + mockUIDs[i] = uuid.Must(uuid.NewV4()) + } + user1 := mockUIDs[0].String() + namespace1 := mockUIDs[1].String() + now := time.Now() + + pipelineUID, ownerUID := mockUIDs[2], mockUIDs[3] + pipelineUID2 := mockUIDs[4] + ownerPermalink := "users/" + ownerUID.String() + pipelineID := "test" + pipelineID2 := "test2" + + tx := db.Begin() + c.Cleanup(func() { tx.Rollback() }) + + repo := NewRepository(tx, cache) + + p := &datamodel.Pipeline{ + Owner: ownerPermalink, + ID: pipelineID, + BaseDynamic: datamodel.BaseDynamic{ + UID: pipelineUID, + CreateTime: t0, + UpdateTime: t0, + }, + } + err := repo.CreateNamespacePipeline(ctx, p) + c.Check(err, qt.IsNil) + + p2 := &datamodel.Pipeline{ + Owner: ownerPermalink, + ID: pipelineID2, + BaseDynamic: datamodel.BaseDynamic{ + UID: pipelineUID2, + CreateTime: t0, + UpdateTime: t0, + }, + } + err = repo.CreateNamespacePipeline(ctx, p2) + c.Check(err, qt.IsNil) + + got, err := repo.GetNamespacePipelineByID(ctx, ownerPermalink, pipelineID, true, false) + c.Check(err, qt.IsNil) + c.Check(got.NumberOfRuns, qt.Equals, 0) + c.Check(got.LastRunTime.IsZero(), qt.IsTrue) + got, err = repo.GetNamespacePipelineByID(ctx, ownerPermalink, pipelineID2, true, false) + c.Check(err, qt.IsNil) + c.Check(got.NumberOfRuns, qt.Equals, 0) + c.Check(got.LastRunTime.IsZero(), qt.IsTrue) + + pipelineRun := &datamodel.PipelineRun{ + PipelineTriggerUID: uuid.Must(uuid.NewV4()), + PipelineUID: p.UID, + Status: datamodel.RunStatus(runpb.RunStatus_RUN_STATUS_PROCESSING), + Source: datamodel.RunSource(runpb.RunSource_RUN_SOURCE_API), + TriggeredBy: user1, + Namespace: namespace1, + StartedTime: now.Add(-1 * time.Hour), + TotalDuration: null.IntFrom(42), + Components: nil, + } + + err = repo.UpsertPipelineRun(ctx, pipelineRun) + c.Check(err, qt.IsNil) + + resp, _, err := repo.GetPaginatedPipelineRunsByRequester(ctx, GetPipelineRunsByRequesterParams{ + RequesterUID: namespace1, + StartTimeBegin: now.Add(-3 * time.Hour), + StartTimeEnd: now.Add(-2 * time.Hour), + Page: 0, + PageSize: 10, + Filter: filtering.Filter{}, + Order: ordering.OrderBy{}, + }) + c.Check(err, qt.IsNil) + c.Check(resp, qt.HasLen, 0) + + resp, _, err = repo.GetPaginatedPipelineRunsByRequester(ctx, GetPipelineRunsByRequesterParams{ + RequesterUID: namespace1, + StartTimeBegin: now.Add(-2 * time.Hour), + StartTimeEnd: now, + Page: 0, + PageSize: 10, + Filter: filtering.Filter{}, + Order: ordering.OrderBy{}, + }) + c.Check(err, qt.IsNil) + c.Check(resp, qt.HasLen, 1) + c.Check(resp[0].PipelineTriggerUID, qt.Equals, pipelineRun.PipelineTriggerUID) + c.Check(resp[0].Pipeline.ID, qt.Equals, p.ID) + + pipelineRun2 := &datamodel.PipelineRun{ + PipelineTriggerUID: uuid.Must(uuid.NewV4()), + PipelineUID: p2.UID, + Status: datamodel.RunStatus(runpb.RunStatus_RUN_STATUS_PROCESSING), + Source: datamodel.RunSource(runpb.RunSource_RUN_SOURCE_API), + TriggeredBy: user1, + Namespace: namespace1, + StartedTime: now.Add(-1 * time.Hour), + TotalDuration: null.IntFrom(42), + Components: nil, + } + + err = repo.UpsertPipelineRun(ctx, pipelineRun2) + c.Check(err, qt.IsNil) + + resp, _, err = repo.GetPaginatedPipelineRunsByRequester(ctx, GetPipelineRunsByRequesterParams{ + RequesterUID: namespace1, + StartTimeBegin: now.Add(-2 * time.Hour), + StartTimeEnd: now, + Page: 0, + PageSize: 10, + Filter: filtering.Filter{}, + Order: ordering.OrderBy{}, + }) + c.Check(err, qt.IsNil) + c.Check(resp, qt.HasLen, 2) + c.Check(resp[0].PipelineTriggerUID, qt.Equals, pipelineRun.PipelineTriggerUID) + c.Check(resp[0].Pipeline.ID, qt.Equals, p.ID) + c.Check(resp[1].PipelineTriggerUID, qt.Equals, pipelineRun2.PipelineTriggerUID) + c.Check(resp[1].Pipeline.ID, qt.Equals, p2.ID) } diff --git a/pkg/service/main.go b/pkg/service/main.go index 7dcfe29a7..4246fd440 100644 --- a/pkg/service/main.go +++ b/pkg/service/main.go @@ -78,6 +78,7 @@ type Service interface { ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRunsRequest, filter filtering.Filter) (*pb.ListPipelineRunsResponse, error) ListComponentRuns(ctx context.Context, req *pb.ListComponentRunsRequest, filter filtering.Filter) (*pb.ListComponentRunsResponse, error) + ListPipelineRunsByRequester(ctx context.Context, req *pb.ListPipelineRunsByCreditOwnerRequest) (*pb.ListPipelineRunsByCreditOwnerResponse, error) GetIntegration(_ context.Context, id string, _ pb.View) (*pb.Integration, error) ListIntegrations(context.Context, *pb.ListIntegrationsRequest) (*pb.ListIntegrationsResponse, error) diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index 77940e6d1..9ab960a2f 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -36,6 +36,7 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/datamodel" "github.com/instill-ai/pipeline-backend/pkg/logger" "github.com/instill-ai/pipeline-backend/pkg/recipe" + "github.com/instill-ai/pipeline-backend/pkg/repository" "github.com/instill-ai/pipeline-backend/pkg/resource" "github.com/instill-ai/pipeline-backend/pkg/utils" "github.com/instill-ai/pipeline-backend/pkg/worker" @@ -1953,3 +1954,88 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pipelinepb.ListCom PageSize: int32(pageSize), }, nil } + +func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pipelinepb.ListPipelineRunsByCreditOwnerRequest) (*pipelinepb.ListPipelineRunsByCreditOwnerResponse, error) { + page := s.pageInRange(req.GetPage()) + pageSize := s.pageSizeInRange(req.GetPageSize()) + requesterUID, _ := utils.GetRequesterUIDAndUserUID(ctx) + + declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{ + filtering.DeclareStandardFunctions(), + filtering.DeclareIdent("status", filtering.TypeString), + filtering.DeclareIdent("source", filtering.TypeString), + }...) + if err != nil { + return nil, err + } + + filter, err := filtering.ParseFilter(req, declarations) + if err != nil { + return nil, err + } + + orderBy, err := ordering.ParseOrderBy(req) + if err != nil { + return nil, err + } + + now := time.Now().UTC() + startedTimeBegin := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + if req.GetStart().IsValid() { + startedTimeBegin = req.GetStart().AsTime() + } + startedTimeEnd := now + if req.GetStop().IsValid() { + startedTimeEnd = req.GetStop().AsTime() + } + + if startedTimeBegin.After(startedTimeEnd) { + return nil, fmt.Errorf("time range end time is earlier than start time") + } + + pipelineRuns, totalCount, err := s.repository.GetPaginatedPipelineRunsByRequester(ctx, repository.GetPipelineRunsByRequesterParams{ + RequesterUID: requesterUID, + StartTimeBegin: startedTimeBegin, + StartTimeEnd: startedTimeEnd, + Page: page, + PageSize: pageSize, + Filter: filter, + Order: orderBy, + }) + if err != nil { + return nil, fmt.Errorf("getting pipeline runs by requester: %w", err) + } + + requesterIDMap := make(map[string]struct{}) + for _, pipelineRun := range pipelineRuns { + requesterIDMap[pipelineRun.TriggeredBy] = struct{}{} + } + + runnerMap := make(map[string]*string) + for requesterID := range requesterIDMap { + runner, err := s.mgmtPrivateServiceClient.CheckNamespaceByUIDAdmin(ctx, &mgmtpb.CheckNamespaceByUIDAdminRequest{Uid: requesterID}) + if err != nil { + return nil, err + } + runnerMap[requesterID] = &runner.Id + } + + pbPipelineRuns := make([]*pipelinepb.PipelineRun, len(pipelineRuns)) + + var pbRun *pipelinepb.PipelineRun + for i, run := range pipelineRuns { + pbRun, err = s.convertPipelineRunToPB(run) + if err != nil { + return nil, fmt.Errorf("converting pipeline run: %w", err) + } + pbRun.RunnerId = runnerMap[run.TriggeredBy] + pbPipelineRuns[i] = pbRun + } + + return &pipelinepb.ListPipelineRunsByCreditOwnerResponse{ + PipelineRuns: pbPipelineRuns, + TotalSize: int32(totalCount), + Page: int32(page), + PageSize: int32(pageSize), + }, nil +} diff --git a/pkg/service/utils.go b/pkg/service/utils.go index f5795f41a..e5ee808fa 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -101,6 +101,7 @@ func (s *service) GetRscNamespace(ctx context.Context, namespaceID string) (reso func (s *service) convertPipelineRunToPB(run datamodel.PipelineRun) (*pipelinepb.PipelineRun, error) { result := &pipelinepb.PipelineRun{ PipelineUid: run.PipelineUID.String(), + PipelineId: &run.Pipeline.ID, PipelineRunUid: run.PipelineTriggerUID.String(), PipelineVersion: run.PipelineVersion, Status: runpb.RunStatus(run.Status),