Skip to content

Commit

Permalink
incorporate feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Nov 30, 2023
1 parent 87f5191 commit 8743cda
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 30 deletions.
7 changes: 3 additions & 4 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
return nil, err
}

if _, ok := fs.(storage.UploadsManager); ok {
// TODO we can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info
if _, ok := fs.(storage.UploadSessionLister); ok {
// We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info
go func() {
for {
ev := <-handler.CompleteUploads
// TODO we should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files
// We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files
// so we create a Progress instance here that is used to read the correct properties
up := upload.Progress{
Info: ev.Upload,
Expand Down Expand Up @@ -194,7 +194,6 @@ func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) {
}
expires := info.MetaData["expires"]
if expires != "" {
// FIXME currently info.MetaData["expires"] is an int ... but it MUST be RFC 7231 datetime format, see https://tus.io/protocols/resumable-upload#upload-expires
w.Header().Set(net.HeaderTusUploadExpires, expires)
}
resourceid := provider.ResourceId{
Expand Down
18 changes: 11 additions & 7 deletions pkg/storage/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
tusd "github.com/tus/tusd/pkg/handler"
)

// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
Expand All @@ -36,8 +37,15 @@ type UploadRequest struct {
Length int64
}

// UploadsManager defines the interface for FS implementations that allow for managing uploads
// UploadsManager defines the interface for storage drivers that allow for managing uploads
// Deprecated: No longer used. Storage drivers should implement the UploadSessionLister.
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}

// UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions
type UploadSessionLister interface {
// GetUploadProgress returns the upload progress
ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error)
}
Expand All @@ -63,17 +71,13 @@ type UploadSession interface {
// IsProcessing returns true if postprocessing has not finished, yet
// The actual postprocessing state is tracked in the postprocessing service.
IsProcessing() bool
// MalwareDescription returns the scan result returned by the scanner
MalwareDescription() string
// MalwareScanTime returns the timestamp the upload was scanned. Default time means the item has not been scanned
MalwareScanTime() time.Time

// Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome
Purge() error
Purge(ctx context.Context) error
}

type UploadSessionFilter struct {
Id *string
ID *string
Processing *bool
Expired *bool
}
12 changes: 5 additions & 7 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
// GetUploadProgress returns the metadata for the given upload id
func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) {
var sessions []storage.UploadSession
if filter.Id != nil && *filter.Id != "" {
session, err := fs.getUploadProgress(ctx, filepath.Join(fs.o.Root, "uploads", *filter.Id+".info"))
if filter.ID != nil && *filter.ID != "" {
session, err := fs.getUploadSession(ctx, filepath.Join(fs.o.Root, "uploads", *filter.ID+".info"))
if err != nil {
return nil, err
}
Expand All @@ -273,7 +273,6 @@ func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.U
if now.After(session.Expires()) {
continue
}

}
}
filteredSessions = append(filteredSessions, session)
Expand Down Expand Up @@ -310,9 +309,9 @@ func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSes
}

for _, info := range infoFiles {
progress, err := fs.getUploadProgress(ctx, info)
progress, err := fs.getUploadSession(ctx, info)
if err != nil {
// Log error?
appctx.GetLogger(ctx).Error().Interface("path", info).Msg("Decomposedfs: could not getUploadSession")
continue
}

Expand All @@ -321,7 +320,7 @@ func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSes
return uploads, nil
}

func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (storage.UploadSession, error) {
func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (storage.UploadSession, error) {
match := _idRegexp.FindStringSubmatch(path)
if match == nil || len(match) < 2 {
return nil, fmt.Errorf("invalid upload path")
Expand All @@ -344,6 +343,5 @@ func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (sto
Info: info,
Processing: n.IsProcessing(ctx),
}
_, progress.ScanStatus, progress.ScanTime = n.ScanData(ctx)
return progress, nil
}
15 changes: 3 additions & 12 deletions pkg/storage/utils/decomposedfs/upload/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,6 @@ type Progress struct {
Path string
Info tusd.FileInfo
Processing bool
ScanStatus string
ScanTime time.Time
}

func (p Progress) ID() string {
Expand All @@ -523,7 +521,7 @@ func (p Progress) Reference() provider.Reference {
ResourceId: &provider.ResourceId{
StorageId: p.Info.MetaData["providerID"],
SpaceId: p.Info.Storage["SpaceRoot"],
OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in Initiate Upload
OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in InitiateUpload
},
}
}
Expand All @@ -536,8 +534,8 @@ func (p Progress) Executant() userpb.UserId {
}
func (p Progress) SpaceOwner() *userpb.UserId {
return &userpb.UserId{
// idp and type do not seem to be consumed and the node currently only stores the user id anyway
OpaqueId: p.Info.Storage["SpaceOwnerOrManager"],
// TODO idp and type?
}
}
func (p Progress) Expires() time.Time {
Expand All @@ -548,15 +546,8 @@ func (p Progress) Expires() time.Time {
func (p Progress) IsProcessing() bool {
return p.Processing
}
func (p Progress) MalwareDescription() string {
return p.ScanStatus
}
func (p Progress) MalwareScanTime() time.Time {
return p.ScanTime
}

func (p Progress) Purge() error {
// TODO we should use the upload id to look up the tus upload and Terminate() that
func (p Progress) Purge(ctx context.Context) error {
err := os.Remove(p.Info.Storage["BinPath"])
if err != nil {
return err
Expand Down

0 comments on commit 8743cda

Please sign in to comment.