Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runner: cron job for asset deletion #233

Merged
merged 7 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/task-runner/task-runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ func Run(build BuildFlags) {
return nil
})

eg.Go(func() error {
glog.Info("Starting cron job for deletion...")
err := runner.CronJobForAssetDeletion(ctx)
if err != nil {
return fmt.Errorf("cron job for deletion error: %w", err)
}
return nil
})

if err := eg.Wait(); err != nil {
glog.Fatalf("Fatal error on task-runner: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/golang/glog v1.2.0
github.com/julienschmidt/httprouter v1.3.0
github.com/livepeer/catalyst-api v0.1.2-0.20230925142340-c311569665b4
github.com/livepeer/go-api-client v0.4.17
github.com/livepeer/go-api-client v0.4.19-0.20240311145302-1abd53df256c
github.com/livepeer/go-tools v0.3.2
github.com/livepeer/livepeer-data v0.7.5-0.20230927031152-b938ac1dc665
github.com/peterbourgon/ff v1.7.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/livepeer/catalyst-api v0.1.2-0.20230925142340-c311569665b4 h1:UfiMdEDGa88yqYD9+i1+ldAex9Kf1+3jbq+wBrmZccM=
github.com/livepeer/catalyst-api v0.1.2-0.20230925142340-c311569665b4/go.mod h1:Ybiub5AGDrDfvyh1MWdIa551LAwhx/6lSpbQlgb1W1Q=
github.com/livepeer/go-api-client v0.4.17 h1:/VKSUTn0hP8dHuAyA2He0kf/f0Av9iOLFnAeNStol6s=
github.com/livepeer/go-api-client v0.4.17/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-api-client v0.4.19-0.20240311145302-1abd53df256c h1:KPGkwuKvAbHCADy3hssTCfJVh0wxYMlXVTehEikljCc=
github.com/livepeer/go-api-client v0.4.19-0.20240311145302-1abd53df256c/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-tools v0.3.2 h1:5pOUrOmkkGbbcWnpCt2yrSD6cD85G4GcpO4B25NpMJM=
github.com/livepeer/go-tools v0.3.2/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/livepeer-data v0.7.5-0.20230927031152-b938ac1dc665 h1:EXlI922Fsv9lyIw1LQ7pZN6slCuYya8NQrCFWN8INg4=
Expand Down
132 changes: 132 additions & 0 deletions task/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
Start() error
HandleCatalysis(ctx context.Context, taskId, nextStep, attemptID string, callback *clients.CatalystCallback) error
Shutdown(ctx context.Context) error
CronJobForAssetDeletion(ctx context.Context) error
}

type RunnerOptions struct {
Expand Down Expand Up @@ -545,6 +546,137 @@
return r.amqp.Shutdown(ctx)
}

func (r *runner) UnpinFromIpfs(ctx context.Context, cid string, filter string) error {
assets, _, err := r.lapi.ListAssets(api.ListOptions{
Filters: map[string]interface{}{
filter: cid,
},
AllUsers: true,
})

Check warning on line 555 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L549-L555

Added lines #L549 - L555 were not covered by tests

if err != nil {
return err

Check warning on line 558 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L557-L558

Added lines #L557 - L558 were not covered by tests
}

if len(assets) == 1 {
return r.ipfs.Unpin(ctx, cid)

Check warning on line 562 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L561-L562

Added lines #L561 - L562 were not covered by tests
}

return nil

Check warning on line 565 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L565

Added line #L565 was not covered by tests
}

func (r *runner) CronJobForAssetDeletion(ctx context.Context) error {

Check warning on line 568 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L568

Added line #L568 was not covered by tests
// Loop every hour to delete assets
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

Check warning on line 571 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L570-L571

Added lines #L570 - L571 were not covered by tests

glog.Infof("Starting asset deletion cron job")

Check warning on line 573 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L573

Added line #L573 was not covered by tests

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
assets, err := r.lapi.GetDeletingAssets()
if err != nil {
glog.Errorf("Error retrieving assets for deletion: %v", err)
continue

Check warning on line 583 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L575-L583

Added lines #L575 - L583 were not covered by tests
}
for _, asset := range assets {
err := deleteAsset(asset, r, ctx)
if err != nil {
glog.Errorf("Error deleting asset %v: %v", asset.ID, err)

Check warning on line 588 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L585-L588

Added lines #L585 - L588 were not covered by tests
}
}
}
}
}

func deleteAsset(asset *api.Asset, r *runner, ctx context.Context) error {

Check warning on line 595 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L595

Added line #L595 was not covered by tests

objectStore, err := r.lapi.GetObjectStore(asset.ObjectStoreID)
if err != nil {
return err

Check warning on line 599 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L597-L599

Added lines #L597 - L599 were not covered by tests
}
osDriver, err := drivers.ParseOSURL(objectStore.URL, true)
if err != nil {
return UnretriableError{fmt.Errorf("error parsing object store url=%s: %w", objectStore.URL, err)}

Check warning on line 603 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L601-L603

Added lines #L601 - L603 were not covered by tests
}
assetOS := osDriver.NewSession("")

Check warning on line 605 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L605

Added line #L605 was not covered by tests

if err != nil {
glog.Errorf("Error getting asset object store session: %v", err)
return err

Check warning on line 609 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L607-L609

Added lines #L607 - L609 were not covered by tests
}

directory := asset.PlaybackID
var totalDeleted int

Check warning on line 613 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L612-L613

Added lines #L612 - L613 were not covered by tests

// Initially list files
pi, err := assetOS.ListFiles(ctx, directory, "/")
glog.Infof("Found %v files for asset %v", len(pi.Files()), asset.ID)
if err != nil {
glog.Errorf("Error listing files for asset %v: %v", asset.ID, err)
return err

Check warning on line 620 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L616-L620

Added lines #L616 - L620 were not covered by tests
}

isErr := false

Check warning on line 623 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L623

Added line #L623 was not covered by tests

for pi != nil {
for _, file := range pi.Files() {
glog.Infof("Found file %v", file.Name)
err := assetOS.DeleteFile(ctx, file.Name)
if err != nil {
glog.Errorf("Error deleting file %v: %v", file.Name, err)
isErr = true
continue

Check warning on line 632 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L625-L632

Added lines #L625 - L632 were not covered by tests
}
glog.Infof("Deleted file %v", file.Name)
totalDeleted++

Check warning on line 635 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L634-L635

Added lines #L634 - L635 were not covered by tests
}

if pi.HasNextPage() {
pi, err = pi.NextPage()
if err != nil {
glog.Errorf("Failed to load next page of files for asset %v: %v", asset.ID, err)
isErr = true
break

Check warning on line 643 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L638-L643

Added lines #L638 - L643 were not covered by tests
}
} else {
break // No more pages

Check warning on line 646 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L645-L646

Added lines #L645 - L646 were not covered by tests
}
}

glog.Infof("Deleted %v files from asset=%v", totalDeleted, asset.ID)

Check warning on line 650 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L650

Added line #L650 was not covered by tests

if ipfs := asset.AssetSpec.Storage.IPFS; ipfs != nil {
err = r.UnpinFromIpfs(ctx, ipfs.CID, "cid")
if err != nil {
glog.Errorf("Error unpinning from IPFS %v", ipfs.CID)
return err

Check warning on line 656 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L652-L656

Added lines #L652 - L656 were not covered by tests
}
err = r.UnpinFromIpfs(ctx, ipfs.NFTMetadata.CID, "nftMetadataCid")
if err != nil {
glog.Errorf("Error unpinning metadata from IPFS %v", ipfs.NFTMetadata.CID)
return err

Check warning on line 661 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L658-L661

Added lines #L658 - L661 were not covered by tests
}

glog.Infof("Unpinned asset=%v from IPFS", asset.ID)

Check warning on line 664 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L664

Added line #L664 was not covered by tests
}

if isErr {
return errors.New("error deleting files")

Check warning on line 668 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L667-L668

Added lines #L667 - L668 were not covered by tests
}

err = r.lapi.FlagAssetAsDeleted(asset.ID)
if err != nil {
glog.Errorf("Error flagging asset as deleted: %v", err)
return err

Check warning on line 674 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L671-L674

Added lines #L671 - L674 were not covered by tests
}

return nil

Check warning on line 677 in task/runner.go

View check run for this annotation

Codecov / codecov/patch

task/runner.go#L677

Added line #L677 was not covered by tests
}

type taskErrInfo struct {
RawError, HumanError error
Unretriable bool
Expand Down
Loading