diff --git a/cmd/task-runner/task-runner.go b/cmd/task-runner/task-runner.go index aed01cb..1e02df2 100644 --- a/cmd/task-runner/task-runner.go +++ b/cmd/task-runner/task-runner.go @@ -195,6 +195,15 @@ func Run(build BuildFlags) { return nil }) + eg.Go(func() error { + glog.Info("Starting cron job for deletion...") + err := runner.CronJobForAssetDeletion(ctx) + if err != nil { + return fmt.Errorf("cron job for deletion error: %w", err) + } + return nil + }) + if err := eg.Wait(); err != nil { glog.Fatalf("Fatal error on task-runner: %v", err) } diff --git a/go.mod b/go.mod index 65d802a..4273941 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/golang/glog v1.2.0 github.com/julienschmidt/httprouter v1.3.0 github.com/livepeer/catalyst-api v0.1.2-0.20230925142340-c311569665b4 - github.com/livepeer/go-api-client v0.4.17 + github.com/livepeer/go-api-client v0.4.19-0.20240311145302-1abd53df256c github.com/livepeer/go-tools v0.3.2 github.com/livepeer/livepeer-data v0.7.5-0.20230927031152-b938ac1dc665 github.com/peterbourgon/ff v1.7.1 diff --git a/go.sum b/go.sum index 0089f0a..d1b303f 100644 --- a/go.sum +++ b/go.sum @@ -281,8 +281,8 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo= github.com/livepeer/catalyst-api v0.1.2-0.20230925142340-c311569665b4 h1:UfiMdEDGa88yqYD9+i1+ldAex9Kf1+3jbq+wBrmZccM= github.com/livepeer/catalyst-api v0.1.2-0.20230925142340-c311569665b4/go.mod h1:Ybiub5AGDrDfvyh1MWdIa551LAwhx/6lSpbQlgb1W1Q= -github.com/livepeer/go-api-client v0.4.17 h1:/VKSUTn0hP8dHuAyA2He0kf/f0Av9iOLFnAeNStol6s= -github.com/livepeer/go-api-client v0.4.17/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.19-0.20240311145302-1abd53df256c h1:KPGkwuKvAbHCADy3hssTCfJVh0wxYMlXVTehEikljCc= +github.com/livepeer/go-api-client v0.4.19-0.20240311145302-1abd53df256c/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-tools v0.3.2 h1:5pOUrOmkkGbbcWnpCt2yrSD6cD85G4GcpO4B25NpMJM= github.com/livepeer/go-tools v0.3.2/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos= github.com/livepeer/livepeer-data v0.7.5-0.20230927031152-b938ac1dc665 h1:EXlI922Fsv9lyIw1LQ7pZN6slCuYya8NQrCFWN8INg4= diff --git a/task/runner.go b/task/runner.go index 5b3227f..f7bed7b 100644 --- a/task/runner.go +++ b/task/runner.go @@ -104,6 +104,7 @@ type Runner interface { Start() error HandleCatalysis(ctx context.Context, taskId, nextStep, attemptID string, callback *clients.CatalystCallback) error Shutdown(ctx context.Context) error + CronJobForAssetDeletion(ctx context.Context) error } type RunnerOptions struct { @@ -545,6 +546,137 @@ func (r *runner) Shutdown(ctx context.Context) error { return r.amqp.Shutdown(ctx) } +func (r *runner) UnpinFromIpfs(ctx context.Context, cid string, filter string) error { + assets, _, err := r.lapi.ListAssets(api.ListOptions{ + Filters: map[string]interface{}{ + filter: cid, + }, + AllUsers: true, + }) + + if err != nil { + return err + } + + if len(assets) == 1 { + return r.ipfs.Unpin(ctx, cid) + } + + return nil +} + +func (r *runner) CronJobForAssetDeletion(ctx context.Context) error { + // Loop every hour to delete assets + ticker := time.NewTicker(10 * time.Minute) + defer ticker.Stop() + + glog.Infof("Starting asset deletion cron job") + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + assets, err := r.lapi.GetDeletingAssets() + if err != nil { + glog.Errorf("Error retrieving assets for deletion: %v", err) + continue + } + for _, asset := range assets { + err := deleteAsset(asset, r, ctx) + if err != nil { + glog.Errorf("Error deleting asset %v: %v", asset.ID, err) + } + } + } + } +} + +func deleteAsset(asset *api.Asset, r *runner, ctx context.Context) error { + + objectStore, err := r.lapi.GetObjectStore(asset.ObjectStoreID) + if err != nil { + return err + } + osDriver, err := drivers.ParseOSURL(objectStore.URL, true) + if err != nil { + return UnretriableError{fmt.Errorf("error parsing object store url=%s: %w", objectStore.URL, err)} + } + assetOS := osDriver.NewSession("") + + if err != nil { + glog.Errorf("Error getting asset object store session: %v", err) + return err + } + + directory := asset.PlaybackID + var totalDeleted int + + // Initially list files + pi, err := assetOS.ListFiles(ctx, directory, "/") + glog.Infof("Found %v files for asset %v", len(pi.Files()), asset.ID) + if err != nil { + glog.Errorf("Error listing files for asset %v: %v", asset.ID, err) + return err + } + + isErr := false + + for pi != nil { + for _, file := range pi.Files() { + glog.Infof("Found file %v", file.Name) + err := assetOS.DeleteFile(ctx, file.Name) + if err != nil { + glog.Errorf("Error deleting file %v: %v", file.Name, err) + isErr = true + continue + } + glog.Infof("Deleted file %v", file.Name) + totalDeleted++ + } + + if pi.HasNextPage() { + pi, err = pi.NextPage() + if err != nil { + glog.Errorf("Failed to load next page of files for asset %v: %v", asset.ID, err) + isErr = true + break + } + } else { + break // No more pages + } + } + + glog.Infof("Deleted %v files from asset=%v", totalDeleted, asset.ID) + + if ipfs := asset.AssetSpec.Storage.IPFS; ipfs != nil { + err = r.UnpinFromIpfs(ctx, ipfs.CID, "cid") + if err != nil { + glog.Errorf("Error unpinning from IPFS %v", ipfs.CID) + return err + } + err = r.UnpinFromIpfs(ctx, ipfs.NFTMetadata.CID, "nftMetadataCid") + if err != nil { + glog.Errorf("Error unpinning metadata from IPFS %v", ipfs.NFTMetadata.CID) + return err + } + + glog.Infof("Unpinned asset=%v from IPFS", asset.ID) + } + + if isErr { + return errors.New("error deleting files") + } + + err = r.lapi.FlagAssetAsDeleted(asset.ID) + if err != nil { + glog.Errorf("Error flagging asset as deleted: %v", err) + return err + } + + return nil +} + type taskErrInfo struct { RawError, HumanError error Unretriable bool