Skip to content

Commit

Permalink
Merge pull request #3577 from kobergj/PrepareForSSE
Browse files Browse the repository at this point in the history
[tests-only] Prepare for SSEs
  • Loading branch information
kobergj authored Jan 11, 2023
2 parents 580641d + c153997 commit 6286f85
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 1 deletion.
5 changes: 5 additions & 0 deletions changelog/unreleased/prepare-for-sse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Prepare for SSE

Prepare for server sent events with some minor changes

https://github.com/cs3org/reva/pull/3577
1 change: 1 addition & 0 deletions pkg/events/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type UploadReady struct {
ExecutingUser *user.User
FileRef *provider.Reference
Failed bool
Timestamp time.Time
// add reference here? We could use it to inform client pp is finished
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,11 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
failed = true
}

now := time.Now()
if p, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], n.ParentID, false); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent")
} else {
// update parent tmtime to propagate etag change
now := time.Now()
_ = p.SetTMTime(&now)
if err := fs.tp.Propagate(ctx, p, 0); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change")
Expand All @@ -288,6 +288,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
UploadID: ev.UploadID,
Failed: failed,
ExecutingUser: ev.ExecutingUser,
Filename: ev.Filename,
FileRef: &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: up.Info.MetaData["providerID"],
Expand All @@ -296,6 +297,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan interface{}) {
},
Path: utils.MakeRelativePath(filepath.Join(up.Info.MetaData["dir"], up.Info.MetaData["filename"])),
},
Timestamp: now,
SpaceOwner: n.SpaceOwnerOrManager(ctx),
},
); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
Expand Down
28 changes: 28 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package utils

import (
"encoding/json"
"errors"
"math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -384,6 +386,18 @@ func AppendPlainToOpaque(o *types.Opaque, key, value string) *types.Opaque {
return o
}

// AppendJSONToOpaque adds a new key value pair as a json on the given opaque and returns it. Ignores errors
func AppendJSONToOpaque(o *types.Opaque, key string, value interface{}) *types.Opaque {
o = ensureOpaque(o)

b, _ := json.Marshal(value)
o.Map[key] = &types.OpaqueEntry{
Decoder: "json",
Value: b,
}
return o
}

// ReadPlainFromOpaque reads a plain string from the given opaque map
func ReadPlainFromOpaque(o *types.Opaque, key string) string {
if o.GetMap() == nil {
Expand All @@ -395,6 +409,20 @@ func ReadPlainFromOpaque(o *types.Opaque, key string) string {
return ""
}

// ReadJSONFromOpaque reads and unmarshals a value from the opaque in the given interface{} (Make sure it's a pointer!)
func ReadJSONFromOpaque(o *types.Opaque, key string, valptr interface{}) error {
if o.GetMap() == nil {
return errors.New("not found")
}

e, ok := o.Map[key]
if !ok || e.Decoder != "json" {
return errors.New("not found")
}

return json.Unmarshal(e.Value, valptr)
}

// ExistsInOpaque returns true if the key exists in the opaque (ignoring the value)
func ExistsInOpaque(o *types.Opaque, key string) bool {
if o.GetMap() == nil {
Expand Down

0 comments on commit 6286f85

Please sign in to comment.