diff --git a/pkg/events/postprocessing.go b/pkg/events/postprocessing.go index 23c26090d7..85ef745a61 100644 --- a/pkg/events/postprocessing.go +++ b/pkg/events/postprocessing.go @@ -37,8 +37,8 @@ type ( var ( // PPStepAntivirus is the step that scans for viruses PPStepAntivirus Postprocessingstep = "virusscan" - // PPStepFTS is the step that indexes files for full text search - PPStepFTS Postprocessingstep = "fts" + // PPStepPolicies is the step the step that enforces policies + PPStepPolicies Postprocessingstep = "policies" // PPStepDelay is the step that processing. Useful for testing or user annoyment PPStepDelay Postprocessingstep = "delay" @@ -68,26 +68,6 @@ func (BytesReceived) Unmarshal(v []byte) (interface{}, error) { return e, err } -// VirusscanFinished is emitted by the server when it has completed an antivirus scan -type VirusscanFinished struct { - Infected bool - Outcome PostprocessingOutcome - UploadID string - Filename string - ExecutingUser *user.User - Description string - Scandate time.Time - ResourceID *provider.ResourceId - ErrorMsg string // empty when no error -} - -// Unmarshal to fulfill umarshaller interface -func (VirusscanFinished) Unmarshal(v []byte) (interface{}, error) { - e := VirusscanFinished{} - err := json.Unmarshal(v, &e) - return e, err -} - // StartPostprocessingStep can be issued by the server to start a postprocessing step type StartPostprocessingStep struct { UploadID string @@ -116,7 +96,7 @@ type PostprocessingStepFinished struct { Filename string FinishedStep Postprocessingstep // name of the step - Result interface{} // result information + Result interface{} // result information see VirusscanResult for example Error error // possible error of the step Outcome PostprocessingOutcome // some services may cause postprocessing to stop } @@ -128,6 +108,15 @@ func (PostprocessingStepFinished) Unmarshal(v []byte) (interface{}, error) { return e, err } +// VirusscanResult is the Result of a PostprocessingStepFinished event from the antivirus +type VirusscanResult struct { + Infected bool + Description string + Scandate time.Time + ResourceID *provider.ResourceId + ErrorMsg string // empty when no error +} + // PostprocessingFinished is emitted by *some* service which can decide that type PostprocessingFinished struct { UploadID string diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index 57d2b96882..c7326ba231 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -155,8 +155,10 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { case "DELETE": handler.DelFile(w, r) case "GET": + // NOTE: this is breaking change - allthought it does not seem to be used + // We can make a switch here depending on some header value if that is needed + // download.GetOrHeadFile(w, r, fs, "") handler.GetFile(w, r) - //download.GetOrHeadFile(w, r, fs, "") default: w.WriteHeader(http.StatusNotImplemented) } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index f3fbf1d5b3..dbf24ea337 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -169,7 +169,7 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event return nil, errors.New("need nats for async file processing") } - ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.VirusscanFinished{}) + ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.PostprocessingStepFinished{}) if err != nil { return nil, err } @@ -266,9 +266,14 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") } - /* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED - case events.VirusscanFinished: - if ev.ErrorMsg != "" { + case events.PostprocessingStepFinished: + if ev.FinishedStep != events.PPStepAntivirus { + // atm we are only interested in antivirus results + continue + } + + res := ev.Result.(events.VirusscanResult) + if res.ErrorMsg != "" { // scan failed somehow // Should we handle this here? continue @@ -278,6 +283,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { switch ev.UploadID { case "": // uploadid is empty -> this was an on-demand scan + /* ON DEMAND SCANNING NOT SUPPORTED ATM ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) ref := &provider.Reference{ResourceId: ev.ResourceID} @@ -352,6 +358,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) continue } + */ default: // uploadid is not empty -> this is an async upload up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) @@ -360,7 +367,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { continue } - no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false) + no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, false) if err != nil { log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan") continue @@ -369,14 +376,13 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { n = no } - if err := n.SetScanData(ev.Description, ev.Scandate); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("Failed to set scan results") + if err := n.SetScanData(res.Description, res.Scandate); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results") continue } // remove cache entry in gateway fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - */ default: log.Error().Interface("event", ev).Msg("Unknown event") }