Skip to content

Commit

Permalink
feat(run): run logging data list by credit owner API
Browse files Browse the repository at this point in the history
  • Loading branch information
joremysh committed Oct 12, 2024
1 parent 9702da4 commit 3f4ab27
Show file tree
Hide file tree
Showing 13 changed files with 819 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/iFaceless/godub v0.0.0-20200728093528-a30bb4d1a0f1
github.com/iancoleman/strcase v0.3.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241007141417-545e5d187408
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241011110226-54b40c98b2f0
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.5.0-alpha
github.com/itchyny/gojq v0.12.14
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,8 @@ github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241007141417-545e5d187408 h1:GTx0g6dXxd7WUm4bvJK1N+j8zuxIaSoXD5AZ2Vl+FXI=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241007141417-545e5d187408/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241011110226-54b40c98b2f0 h1:1h67FkayaXU/ona9m1VZjrQ8Wj8NR05ytxFvUSjIEi4=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241011110226-54b40c98b2f0/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/x v0.5.0-alpha h1:xIeIvrLzwJYOBmZwOePVFVkKGEcMDqtHmn1cfVVNlIE=
Expand Down
1 change: 1 addition & 0 deletions pkg/datamodel/runlogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (j *JSONB) Scan(value interface{}) error {
type PipelineRun struct {
PipelineTriggerUID uuid.UUID `gorm:"primaryKey" json:"pipeline-trigger-uid"` // Unique identifier for each run
PipelineUID uuid.UUID `gorm:"type:uuid;index" json:"pipeline-uid"` // Pipeline unique ID used in the run
Pipeline Pipeline `gorm:"foreignKey:PipelineUID;references:UID"` // Pipeline instance referenced in the run
PipelineVersion string `gorm:"type:varchar(255)" json:"pipeline-version"` // Pipeline version used in the run
Status RunStatus `gorm:"type:valid_trigger_status;index" json:"status"` // Current status of the run (e.g., Running, Completed, Failed)
Source RunSource `gorm:"type:valid_trigger_source" json:"source"` // Origin of the run (e.g., Web click, API)
Expand Down
34 changes: 34 additions & 0 deletions pkg/handler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -1996,3 +1996,37 @@ func (h *PublicHandler) ListComponentRuns(ctx context.Context, req *pb.ListCompo

return resp, nil
}

func (h *PublicHandler) ListPipelineRunsByCreditOwner(ctx context.Context, req *pb.ListPipelineRunsByCreditOwnerRequest) (*pb.ListPipelineRunsByCreditOwnerResponse, error) {
logger, _ := logger.GetZapLogger(ctx)
logUUID, _ := uuid.NewV4()
logger.Info("ListPipelineRunsByCreditOwner starts", zap.String("logUUID", logUUID.String()))

if req.GetStart().IsValid() && req.GetStop().IsValid() && req.GetStart().AsTime().After(req.GetStop().AsTime()) {
return nil, fmt.Errorf("input stop time earlier than start time")
}

declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{
filtering.DeclareStandardFunctions(),
filtering.DeclareIdent("status", filtering.TypeString),
filtering.DeclareIdent("source", filtering.TypeString),
}...)
if err != nil {
return nil, err
}

filter, err := filtering.ParseFilter(req, declarations)
if err != nil {
return nil, err
}

resp, err := h.service.ListPipelineRunsByCreditOwner(ctx, req, filter)
if err != nil {
logger.Error("failed in ListPipelineRunsByCreditOwner", zap.String("logUUID", logUUID.String()), zap.Error(err))
return nil, status.Error(codes.Internal, "Failed to list pipeline runs")
}

logger.Info("ListPipelineRunsByCreditOwner finished", zap.String("logUUID", logUUID.String()))

return resp, nil
}
553 changes: 550 additions & 3 deletions pkg/mock/repository_mock.gen.go

Large diffs are not rendered by default.

57 changes: 56 additions & 1 deletion pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type Repository interface {

GetPaginatedPipelineRunsWithPermissions(ctx context.Context, requesterUID, pipelineUID string, page, pageSize int, filter filtering.Filter, order ordering.OrderBy, isOwner bool) ([]datamodel.PipelineRun, int64, error)
GetPaginatedComponentRunsByPipelineRunIDWithPermissions(ctx context.Context, pipelineRunID string, page, pageSize int, filter filtering.Filter, order ordering.OrderBy) ([]datamodel.ComponentRun, int64, error)
GetPaginatedPipelineRunsByCreditOwner(ctx context.Context, requesterUID string, startTime, endTime time.Time, page, pageSize int, filter filtering.Filter, order ordering.OrderBy) ([]datamodel.PipelineRun, int64, error)
}

type repository struct {
Expand Down Expand Up @@ -1130,7 +1131,7 @@ func (r *repository) AddPipelineClones(ctx context.Context, pipelineUID uuid.UUI

func (r *repository) GetPipelineRunByUID(ctx context.Context, pipelineTriggerUID uuid.UUID) (*datamodel.PipelineRun, error) {
pipelineRun := &datamodel.PipelineRun{PipelineTriggerUID: pipelineTriggerUID}
err := r.db.First(pipelineRun).Error
err := r.db.Preload(clause.Associations).First(pipelineRun).Error
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1271,6 +1272,60 @@ func (r *repository) GetPaginatedComponentRunsByPipelineRunIDWithPermissions(ctx
return componentRuns, totalRows, nil
}

func (r *repository) GetPaginatedPipelineRunsByCreditOwner(ctx context.Context, requesterUID string, startTimeBegin, startTimeEnd time.Time, page, pageSize int, filter filtering.Filter, order ordering.OrderBy) ([]datamodel.PipelineRun, int64, error) {
var pipelineRuns []datamodel.PipelineRun
var totalRows int64

whereConditions := []string{"namespace = ? and started_time >= ? and started_time <= ?"}
whereArgs := []any{requesterUID, startTimeBegin, startTimeEnd}

var expr *clause.Expr
var err error
if expr, err = r.TranspileFilter(filter); err != nil {
return nil, 0, err
}
if expr != nil {
whereConditions = append(whereConditions, "(?)")
whereArgs = append(whereArgs, expr)
}

var where string
if len(whereConditions) > 0 {
where = strings.Join(whereConditions, " and ")
}

err = r.db.Model(&datamodel.PipelineRun{}).
Where(where, whereArgs...).
Count(&totalRows).Error
if err != nil {
return nil, 0, err
}

queryBuilder := r.db.Preload(clause.Associations).Where(where, whereArgs...)

if len(order.Fields) == 0 {
order.Fields = append(order.Fields, ordering.Field{
Path: "started_time",
Desc: true,
})
}

for _, field := range order.Fields {
orderString := strcase.ToSnake(field.Path) + transformBoolToDescString(field.Desc)
queryBuilder.Order(orderString)
}

// Retrieve paginated results with permissions
err = queryBuilder.
Offset(page * pageSize).Limit(pageSize).
Find(&pipelineRuns).Error
if err != nil {
return nil, 0, err
}

return pipelineRuns, totalRows, nil
}

func (r *repository) CreateNamespaceConnection(ctx context.Context, conn *datamodel.Connection) (*datamodel.Connection, error) {
db := r.db.WithContext(ctx)

Expand Down
114 changes: 114 additions & 0 deletions pkg/repository/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-redis/redismock/v9"
"github.com/gofrs/uuid"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
"go.einride.tech/aip/filtering"
"go.einride.tech/aip/ordering"
"go.uber.org/zap"
Expand All @@ -32,6 +33,7 @@ import (
componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"
database "github.com/instill-ai/pipeline-backend/pkg/db"
errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"

runpb "github.com/instill-ai/protogen-go/common/run/v1alpha"
pipelinepb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)
Expand Down Expand Up @@ -480,6 +482,7 @@ func TestRepository_UpsertPipelineRun(t *testing.T) {
c.Check(got1.PipelineUID, qt.Equals, p.UID)
c.Check(got1.Status, qt.Equals, pipelineRun.Status)
c.Check(got1.Source, qt.Equals, pipelineRun.Source)
c.Check(got1.Pipeline.UID, qt.Equals, p.UID)

componentRun := &datamodel.ComponentRun{
PipelineTriggerUID: pipelineRun.PipelineTriggerUID,
Expand Down Expand Up @@ -611,10 +614,121 @@ func TestRepository_GetPaginatedPipelineRunsWithPermissions(t *testing.T) {
c.Assert(err, qt.IsNil)
if testCase.canView {
c.Check(len(response), qt.Equals, 1)
// c.Log(response[0].Pipeline.ID)
// c.Check(response[0].Pipeline.ID, qt.Equals, p.ID)
} else {
c.Check(len(response), qt.Equals, 0)
}
})
}
}

func TestRepository_GetPaginatedPipelineRunsByCreditOwner(t *testing.T) {
c := qt.New(t)
ctx := context.Background()

cache, _ := redismock.NewClientMock()

t0 := time.Now().UTC()

mockUIDs := make([]uuid.UUID, 5)
for i := range len(mockUIDs) {
mockUIDs[i] = uuid.Must(uuid.NewV4())
}
user1 := mockUIDs[0].String()
namespace1 := mockUIDs[1].String()
now := time.Now()

pipelineUID, ownerUID := mockUIDs[2], mockUIDs[3]
pipelineUID2 := mockUIDs[4]
ownerPermalink := "users/" + ownerUID.String()
pipelineID := "test"
pipelineID2 := "test2"

tx := db.Begin()
c.Cleanup(func() { tx.Rollback() })

repo := NewRepository(tx, cache)

p := &datamodel.Pipeline{
Owner: ownerPermalink,
ID: pipelineID,
BaseDynamic: datamodel.BaseDynamic{
UID: pipelineUID,
CreateTime: t0,
UpdateTime: t0,
},
}
err := repo.CreateNamespacePipeline(ctx, p)
require.NoError(t, err)

p2 := &datamodel.Pipeline{
Owner: ownerPermalink,
ID: pipelineID2,
BaseDynamic: datamodel.BaseDynamic{
UID: pipelineUID2,
CreateTime: t0,
UpdateTime: t0,
},
}
err = repo.CreateNamespacePipeline(ctx, p2)
require.NoError(t, err)

got, err := repo.GetNamespacePipelineByID(ctx, ownerPermalink, pipelineID, true, false)
require.NoError(t, err)
require.Zero(t, got.NumberOfRuns)
require.True(t, got.LastRunTime.IsZero())

got, err = repo.GetNamespacePipelineByID(ctx, ownerPermalink, pipelineID2, true, false)
require.NoError(t, err)
require.Zero(t, got.NumberOfRuns)
require.True(t, got.LastRunTime.IsZero())

pipelineRun := &datamodel.PipelineRun{
PipelineTriggerUID: uuid.Must(uuid.NewV4()),
PipelineUID: p.UID,
Status: datamodel.RunStatus(runpb.RunStatus_RUN_STATUS_PROCESSING),
Source: datamodel.RunSource(runpb.RunSource_RUN_SOURCE_API),
TriggeredBy: user1,
Namespace: namespace1,
StartedTime: now.Add(-1 * time.Hour),
TotalDuration: null.IntFrom(42),
Components: nil,
}

err = repo.UpsertPipelineRun(ctx, pipelineRun)
require.NoError(t, err)

resp, _, err := repo.GetPaginatedPipelineRunsByCreditOwner(ctx, namespace1, now.Add(-3*time.Hour), now.Add(-2*time.Hour), 0, 10, filtering.Filter{}, ordering.OrderBy{})
require.NoError(t, err)
require.Len(t, resp, 0)

resp, _, err = repo.GetPaginatedPipelineRunsByCreditOwner(ctx, namespace1, now.Add(-2*time.Hour), now, 0, 10, filtering.Filter{}, ordering.OrderBy{})
require.NoError(t, err)
require.Len(t, resp, 1)
require.Equal(t, resp[0].PipelineTriggerUID, pipelineRun.PipelineTriggerUID)
require.Equal(t, resp[0].Pipeline.ID, p.ID)

pipelineRun2 := &datamodel.PipelineRun{
PipelineTriggerUID: uuid.Must(uuid.NewV4()),
PipelineUID: p2.UID,
Status: datamodel.RunStatus(runpb.RunStatus_RUN_STATUS_PROCESSING),
Source: datamodel.RunSource(runpb.RunSource_RUN_SOURCE_API),
TriggeredBy: user1,
Namespace: namespace1,
StartedTime: now.Add(-1 * time.Hour),
TotalDuration: null.IntFrom(42),
Components: nil,
}

err = repo.UpsertPipelineRun(ctx, pipelineRun2)
require.NoError(t, err)

resp, _, err = repo.GetPaginatedPipelineRunsByCreditOwner(ctx, namespace1, now.Add(-2*time.Hour), now, 0, 10, filtering.Filter{}, ordering.OrderBy{})
require.NoError(t, err)
require.Len(t, resp, 2)
require.Equal(t, resp[0].PipelineTriggerUID, pipelineRun.PipelineTriggerUID)
require.Equal(t, resp[0].Pipeline.ID, p.ID)
require.Equal(t, resp[1].PipelineTriggerUID, pipelineRun2.PipelineTriggerUID)
require.Equal(t, resp[1].Pipeline.ID, p2.ID)
}
4 changes: 3 additions & 1 deletion pkg/service/component_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (

"github.com/instill-ai/pipeline-backend/pkg/recipe"
"github.com/instill-ai/pipeline-backend/pkg/repository"
"github.com/instill-ai/x/paginate"

component "github.com/instill-ai/pipeline-backend/pkg/component/store"
errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"

"github.com/instill-ai/x/paginate"

pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)

Expand Down
1 change: 1 addition & 0 deletions pkg/service/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
componentbase "github.com/instill-ai/pipeline-backend/pkg/component/base"
componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"
errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"

mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)
Expand Down
2 changes: 2 additions & 0 deletions pkg/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/instill-ai/pipeline-backend/pkg/resource"

componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"

mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)
Expand Down Expand Up @@ -78,6 +79,7 @@ type Service interface {

ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRunsRequest, filter filtering.Filter) (*pb.ListPipelineRunsResponse, error)
ListComponentRuns(ctx context.Context, req *pb.ListComponentRunsRequest, filter filtering.Filter) (*pb.ListComponentRunsResponse, error)
ListPipelineRunsByCreditOwner(ctx context.Context, req *pb.ListPipelineRunsByCreditOwnerRequest, filter filtering.Filter) (*pb.ListPipelineRunsByCreditOwnerResponse, error)

GetIntegration(_ context.Context, id string, _ pb.View) (*pb.Integration, error)
ListIntegrations(context.Context, *pb.ListIntegrationsRequest) (*pb.ListIntegrationsResponse, error)
Expand Down
51 changes: 50 additions & 1 deletion pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ import (
"github.com/instill-ai/pipeline-backend/pkg/utils"
"github.com/instill-ai/pipeline-backend/pkg/worker"

componentbase "github.com/instill-ai/pipeline-backend/pkg/component/base"
errdomain "github.com/instill-ai/pipeline-backend/pkg/errors"

"github.com/instill-ai/x/errmsg"

componentbase "github.com/instill-ai/pipeline-backend/pkg/component/base"
mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
pipelinepb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)
Expand Down Expand Up @@ -1988,3 +1988,52 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pipelinepb.ListCom
PageSize: int32(pageSize),
}, nil
}

func (s *service) ListPipelineRunsByCreditOwner(ctx context.Context, req *pipelinepb.ListPipelineRunsByCreditOwnerRequest,
filter filtering.Filter) (*pipelinepb.ListPipelineRunsByCreditOwnerResponse, error) {
page := s.pageInRange(req.GetPage())
pageSize := s.pageSizeInRange(req.GetPageSize())
requesterUID, _ := utils.GetRequesterUIDAndUserUID(ctx)

log, _ := logger.GetZapLogger(ctx)

orderBy, err := ordering.ParseOrderBy(req)
if err != nil {
return nil, err
}

now := time.Now().UTC()
startedTimeBegin := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
if req.GetStart().IsValid() {
startedTimeBegin = req.GetStart().AsTime()
}
startedTimeEnd := now
if req.GetStop().IsValid() {
startedTimeEnd = req.GetStop().AsTime()
}
log.Info("ListPipelineRunsByCreditOwner", zap.Time("startedTimeBegin", startedTimeBegin), zap.Time("startedTimeEnd", startedTimeEnd))

pipelineRuns, totalCount, err := s.repository.GetPaginatedPipelineRunsByCreditOwner(ctx, requesterUID, startedTimeBegin, startedTimeEnd,
page, pageSize, filter, orderBy)
if err != nil {
return nil, fmt.Errorf("failed to get pipeline runs: %w", err)
}

pbPipelineRuns := make([]*pipelinepb.PipelineRun, len(pipelineRuns))

var pbRun *pipelinepb.PipelineRun
for i, run := range pipelineRuns {
pbRun, err = s.convertPipelineRunToPB(run)
if err != nil {
return nil, fmt.Errorf("failed to convert pipeline run: %w", err)
}
pbPipelineRuns[i] = pbRun
}

return &pipelinepb.ListPipelineRunsByCreditOwnerResponse{
PipelineRuns: pbPipelineRuns,
TotalSize: totalCount,
Page: int32(page),
PageSize: int32(pageSize),
}, nil
}
Loading

0 comments on commit 3f4ab27

Please sign in to comment.