diff --git a/changelog/unreleased/prepare-for-sse.md b/changelog/unreleased/prepare-for-sse.md new file mode 100644 index 0000000000..49a0cee3c6 --- /dev/null +++ b/changelog/unreleased/prepare-for-sse.md @@ -0,0 +1,5 @@ +Enhancement: Prepare for SSE + +Prepare for server sent events with some minor changes + +https://github.com/cs3org/reva/pull/3577 diff --git a/pkg/events/postprocessing.go b/pkg/events/postprocessing.go index d49ff9e920..23c26090d7 100644 --- a/pkg/events/postprocessing.go +++ b/pkg/events/postprocessing.go @@ -153,6 +153,7 @@ type UploadReady struct { ExecutingUser *user.User FileRef *provider.Reference Failed bool + Timestamp time.Time // add reference here? We could use it to inform client pp is finished } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index fa3c57f7ca..a7d9ecffdf 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -266,11 +266,11 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { failed = true } + now := time.Now() if p, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], n.ParentID, false); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") } else { // update parent tmtime to propagate etag change - now := time.Now() _ = p.SetTMTime(&now) if err := fs.tp.Propagate(ctx, p, 0); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") @@ -288,6 +288,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { UploadID: ev.UploadID, Failed: failed, ExecutingUser: ev.ExecutingUser, + Filename: ev.Filename, FileRef: &provider.Reference{ ResourceId: &provider.ResourceId{ StorageId: up.Info.MetaData["providerID"], @@ -296,6 +297,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) { }, Path: utils.MakeRelativePath(filepath.Join(up.Info.MetaData["dir"], up.Info.MetaData["filename"])), }, + Timestamp: now, + SpaceOwner: n.SpaceOwnerOrManager(ctx), }, ); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 14314cf5eb..8a4fb54be3 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -19,6 +19,8 @@ package utils import ( + "encoding/json" + "errors" "math/rand" "net" "net/http" @@ -384,6 +386,18 @@ func AppendPlainToOpaque(o *types.Opaque, key, value string) *types.Opaque { return o } +// AppendJSONToOpaque adds a new key value pair as a json on the given opaque and returns it. Ignores errors +func AppendJSONToOpaque(o *types.Opaque, key string, value interface{}) *types.Opaque { + o = ensureOpaque(o) + + b, _ := json.Marshal(value) + o.Map[key] = &types.OpaqueEntry{ + Decoder: "json", + Value: b, + } + return o +} + // ReadPlainFromOpaque reads a plain string from the given opaque map func ReadPlainFromOpaque(o *types.Opaque, key string) string { if o.GetMap() == nil { @@ -395,6 +409,20 @@ func ReadPlainFromOpaque(o *types.Opaque, key string) string { return "" } +// ReadJSONFromOpaque reads and unmarshals a value from the opaque in the given interface{} (Make sure it's a pointer!) +func ReadJSONFromOpaque(o *types.Opaque, key string, valptr interface{}) error { + if o.GetMap() == nil { + return errors.New("not found") + } + + e, ok := o.Map[key] + if !ok || e.Decoder != "json" { + return errors.New("not found") + } + + return json.Unmarshal(e.Value, valptr) +} + // ExistsInOpaque returns true if the key exists in the opaque (ignoring the value) func ExistsInOpaque(o *types.Opaque, key string) bool { if o.GetMap() == nil {