diff --git a/pkg/events/postprocessing.go b/pkg/events/postprocessing.go index 23c26090d70..85ef745a61f 100644 --- a/pkg/events/postprocessing.go +++ b/pkg/events/postprocessing.go @@ -37,8 +37,8 @@ type ( var ( // PPStepAntivirus is the step that scans for viruses PPStepAntivirus Postprocessingstep = "virusscan" - // PPStepFTS is the step that indexes files for full text search - PPStepFTS Postprocessingstep = "fts" + // PPStepPolicies is the step the step that enforces policies + PPStepPolicies Postprocessingstep = "policies" // PPStepDelay is the step that processing. Useful for testing or user annoyment PPStepDelay Postprocessingstep = "delay" @@ -68,26 +68,6 @@ func (BytesReceived) Unmarshal(v []byte) (interface{}, error) { return e, err } -// VirusscanFinished is emitted by the server when it has completed an antivirus scan -type VirusscanFinished struct { - Infected bool - Outcome PostprocessingOutcome - UploadID string - Filename string - ExecutingUser *user.User - Description string - Scandate time.Time - ResourceID *provider.ResourceId - ErrorMsg string // empty when no error -} - -// Unmarshal to fulfill umarshaller interface -func (VirusscanFinished) Unmarshal(v []byte) (interface{}, error) { - e := VirusscanFinished{} - err := json.Unmarshal(v, &e) - return e, err -} - // StartPostprocessingStep can be issued by the server to start a postprocessing step type StartPostprocessingStep struct { UploadID string @@ -116,7 +96,7 @@ type PostprocessingStepFinished struct { Filename string FinishedStep Postprocessingstep // name of the step - Result interface{} // result information + Result interface{} // result information see VirusscanResult for example Error error // possible error of the step Outcome PostprocessingOutcome // some services may cause postprocessing to stop } @@ -128,6 +108,15 @@ func (PostprocessingStepFinished) Unmarshal(v []byte) (interface{}, error) { return e, err } +// VirusscanResult is the Result of a PostprocessingStepFinished event from the antivirus +type VirusscanResult struct { + Infected bool + Description string + Scandate time.Time + ResourceID *provider.ResourceId + ErrorMsg string // empty when no error +} + // PostprocessingFinished is emitted by *some* service which can decide that type PostprocessingFinished struct { UploadID string diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 1273caeee9c..cda5c49b663 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -159,7 +159,7 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event return nil, errors.New("need nats for async file processing") } - ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.VirusscanFinished{}) + ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.PostprocessingStepFinished{}) if err != nil { return nil, err } @@ -256,9 +256,13 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") } - /* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED - case events.VirusscanFinished: - if ev.ErrorMsg != "" { + case events.PostprocessingStepFinished: + if ev.FinishedStep != events.PPStepAntivirus { + // atm we are only interested in antivirus results + } + + res := ev.Result.(events.VirusscanResult) + if res.ErrorMsg != "" { // scan failed somehow // Should we handle this here? continue @@ -268,6 +272,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { switch ev.UploadID { case "": // uploadid is empty -> this was an on-demand scan + /* ON DEMAND SCANNING NOT SUPPORTED ATM ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) ref := &provider.Reference{ResourceId: ev.ResourceID} @@ -342,6 +347,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) continue } + */ default: // uploadid is not empty -> this is an async upload up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) @@ -359,14 +365,13 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { n = no } - if err := n.SetScanData(ev.Description, ev.Scandate); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("Failed to set scan results") + if err := n.SetScanData(res.Description, res.Scandate); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results") continue } // remove cache entry in gateway fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - */ default: log.Error().Interface("event", ev).Msg("Unknown event") }