From 377a23ee7da9c3a534659cc00f181ef781ac63a4 Mon Sep 17 00:00:00 2001 From: kobergj Date: Mon, 18 Dec 2023 11:47:25 +0100 Subject: [PATCH] Non-durable SSE streams (#7986) * make sse streams non-durable Signed-off-by: jkoberg * bump reva Signed-off-by: jkoberg --------- Signed-off-by: jkoberg --- changelog/unreleased/bump-reva.md | 1 + .../unreleased/sse-non-durable-streams.md | 5 ++ go.mod | 8 +-- go.sum | 8 +-- services/antivirus/pkg/service/service.go | 2 +- services/audit/pkg/command/server.go | 2 +- services/clientlog/pkg/command/server.go | 2 +- services/eventhistory/pkg/command/server.go | 2 +- services/frontend/pkg/command/events.go | 2 +- services/graph/pkg/server/http/server.go | 2 +- services/notifications/pkg/command/server.go | 2 +- services/policies/pkg/command/server.go | 2 +- .../pkg/command/postprocessing.go | 2 +- services/postprocessing/pkg/command/server.go | 2 +- .../search/pkg/service/grpc/v0/service.go | 2 +- services/sse/pkg/command/server.go | 2 +- services/storage-users/pkg/event/event.go | 2 +- services/userlog/pkg/command/server.go | 2 +- .../interceptors/eventsmiddleware/events.go | 2 +- .../storageprovider/storageprovider.go | 2 +- .../services/dataprovider/dataprovider.go | 2 +- .../cs3org/reva/v2/pkg/events/stream/nats.go | 13 +++-- .../v2/pkg/rhttp/datatx/metrics/metrics.go | 40 ++++++++++++++ .../v2/pkg/share/manager/jsoncs3/jsoncs3.go | 2 +- .../utils/decomposedfs/decomposedfs.go | 10 ++++ .../storage/utils/decomposedfs/node/node.go | 4 ++ .../pkg/storage/utils/decomposedfs/upload.go | 3 ++ .../utils/decomposedfs/upload/upload.go | 7 +++ .../go-micro/plugins/v4/events/natsjs/nats.go | 52 +++++++++++++------ .../plugins/v4/events/natsjs/options.go | 42 ++++++++++----- vendor/modules.txt | 4 +- 31 files changed, 173 insertions(+), 60 deletions(-) create mode 100644 changelog/unreleased/sse-non-durable-streams.md diff --git a/changelog/unreleased/bump-reva.md b/changelog/unreleased/bump-reva.md index 72915a731f0..66f0c450879 100644 --- a/changelog/unreleased/bump-reva.md +++ b/changelog/unreleased/bump-reva.md @@ -6,3 +6,4 @@ https://github.com/owncloud/ocis/pull/7793 https://github.com/owncloud/ocis/pull/7978 https://github.com/owncloud/ocis/pull/7979 https://github.com/owncloud/ocis/pull/7963 +https://github.com/owncloud/ocis/pull/7986 diff --git a/changelog/unreleased/sse-non-durable-streams.md b/changelog/unreleased/sse-non-durable-streams.md new file mode 100644 index 00000000000..4b3d5700080 --- /dev/null +++ b/changelog/unreleased/sse-non-durable-streams.md @@ -0,0 +1,5 @@ +Bugfix: Non durable streams for sse service + +Configure sse streams to be non-durable. This functionality is not needed for the sse service + +https://github.com/owncloud/ocis/pull/7986 diff --git a/go.mod b/go.mod index a0461f0badf..72e4e96adcd 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.9.0 github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 - github.com/cs3org/reva/v2 v2.17.1-0.20231215155002-7810e3d8be38 + github.com/cs3org/reva/v2 v2.17.1-0.20231218091701-6a3a91e35514 github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25 github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e @@ -35,7 +35,7 @@ require ( github.com/go-micro/plugins/v4/registry/nats v1.2.2-0.20230723205323-1ada01245674 github.com/go-micro/plugins/v4/server/grpc v1.2.0 github.com/go-micro/plugins/v4/server/http v1.2.2 - github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 + github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-00010101000000-000000000000 github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0 github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0 github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry v1.2.0 @@ -193,8 +193,8 @@ require ( github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 // indirect - github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-00010101000000-000000000000 // indirect + github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9 // indirect + github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 // indirect github.com/go-micro/plugins/v4/store/redis v1.2.1-0.20230510195111-07cd57e1bc9d // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect diff --git a/go.sum b/go.sum index e12647b946f..3d50cc9f512 100644 --- a/go.sum +++ b/go.sum @@ -1021,8 +1021,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c= github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.17.1-0.20231215155002-7810e3d8be38 h1:vkA/Ty82yETTDrpLV/b5/9VXUJ/9o7vTRs7ampZC5LU= -github.com/cs3org/reva/v2 v2.17.1-0.20231215155002-7810e3d8be38/go.mod h1:5Yxh1DneWZQvMBOiBVv3LJaSsSlRCoqCBa4Wws7PWHw= +github.com/cs3org/reva/v2 v2.17.1-0.20231218091701-6a3a91e35514 h1:ThhcxiI3Iq8sweI6ZGaEkTzFhum+ev0EGGnDiAK8eW8= +github.com/cs3org/reva/v2 v2.17.1-0.20231218091701-6a3a91e35514/go.mod h1:QW31Q1IQ9ZCJMFv3u8/SdHSyLfCcSVNcRbqIJj+Y+7o= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -1195,8 +1195,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-micro/plugins/v4/client/grpc v1.2.1 h1:7xAwZRCO6mdUtBHsYIQs1/eCTdhCrnjF70GB+AVd6L0= github.com/go-micro/plugins/v4/client/grpc v1.2.1/go.mod h1:3fDuzyfYLwEImn8+lkhKl3W4Ay1jFevkTeC32PBlgQs= -github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 h1:/RpJVLKmKT2OcEnKCPaS6n+zygNzYDzwoYgPQEgcEiQ= -github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7/go.mod h1:lYuiEYKQTpbE2LA8HEcC8D6kQ29M7ILfEak3dzeucEg= +github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9 h1:YOIavj+ZgO9HzukpdXZCvQv+AahjW/fTVFVF4QFRabw= +github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9/go.mod h1:cL0O63th39fZ+M/aRJvajz7Qnmv+UTXugOq1k3qrYiQ= github.com/go-micro/plugins/v4/logger/zerolog v1.2.0 h1:JZ516VQ9zekRoi868XG7x0EWxZ2AMq/euHIBChITsTI= github.com/go-micro/plugins/v4/logger/zerolog v1.2.0/go.mod h1:AieYOIeOxobYa5B8WGEqxXM3Ndi26tDIu9fZ4RYkCvQ= github.com/go-micro/plugins/v4/registry/consul v1.2.1 h1:3wctYMtstwQLCjoJ1HA6mKGGFF1hcdKDv5MzHakB1jE= diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 352e3390f6b..24330c273d4 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -96,7 +96,7 @@ func (av Antivirus) Run() error { evtsCfg.TLSInsecure = false } - natsStream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events)) + natsStream, err := stream.NatsFromConfig(av.c.Service.Name, false, stream.NatsConfig(av.c.Events)) if err != nil { return err } diff --git a/services/audit/pkg/command/server.go b/services/audit/pkg/command/server.go index c945ceea786..6c55e638c1f 100644 --- a/services/audit/pkg/command/server.go +++ b/services/audit/pkg/command/server.go @@ -43,7 +43,7 @@ func Server(cfg *config.Config) *cli.Command { ) defer cancel() - client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/clientlog/pkg/command/server.go b/services/clientlog/pkg/command/server.go index d8399f8ac7a..4dcc14da711 100644 --- a/services/clientlog/pkg/command/server.go +++ b/services/clientlog/pkg/command/server.go @@ -57,7 +57,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() - stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/eventhistory/pkg/command/server.go b/services/eventhistory/pkg/command/server.go index 4ba389f1a12..da5e53df2bd 100644 --- a/services/eventhistory/pkg/command/server.go +++ b/services/eventhistory/pkg/command/server.go @@ -60,7 +60,7 @@ func Server(cfg *config.Config) *cli.Command { metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1) - consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + consumer, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/frontend/pkg/command/events.go b/services/frontend/pkg/command/events.go index e238f607a6a..2ea5f430a18 100644 --- a/services/frontend/pkg/command/events.go +++ b/services/frontend/pkg/command/events.go @@ -39,7 +39,7 @@ var _registeredEvents = []events.Unmarshaller{ // ListenForEvents listens for events and acts accordingly func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) error { - bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { l.Error().Err(err).Msg("cannot connect to nats") return err diff --git a/services/graph/pkg/server/http/server.go b/services/graph/pkg/server/http/server.go index dd772dfecf0..cf84d68d4cc 100644 --- a/services/graph/pkg/server/http/server.go +++ b/services/graph/pkg/server/http/server.go @@ -53,7 +53,7 @@ func Server(opts ...Option) (http.Service, error) { if options.Config.Events.Endpoint != "" { var err error - publisher, err = stream.NatsFromConfig(options.Config.Service.Name, stream.NatsConfig(options.Config.Events)) + publisher, err = stream.NatsFromConfig(options.Config.Service.Name, false, stream.NatsConfig(options.Config.Events)) if err != nil { options.Logger.Error(). Err(err). diff --git a/services/notifications/pkg/command/server.go b/services/notifications/pkg/command/server.go index a6355d11295..55b3c7ad3c4 100644 --- a/services/notifications/pkg/command/server.go +++ b/services/notifications/pkg/command/server.go @@ -89,7 +89,7 @@ func Server(cfg *config.Config) *cli.Command { events.SpaceUnshared{}, events.SpaceMembershipExpired{}, } - client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Notifications.Events)) + client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events)) if err != nil { return err } diff --git a/services/policies/pkg/command/server.go b/services/policies/pkg/command/server.go index ec749c24cf3..3af423b02d1 100644 --- a/services/policies/pkg/command/server.go +++ b/services/policies/pkg/command/server.go @@ -110,7 +110,7 @@ func Server(cfg *config.Config) *cli.Command { { - bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go index c0108af7342..99c853a0c90 100644 --- a/services/postprocessing/pkg/command/postprocessing.go +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -30,7 +30,7 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events)) + stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events)) if err != nil { return err } diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go index c71b278da08..5e1afecc32b 100644 --- a/services/postprocessing/pkg/command/server.go +++ b/services/postprocessing/pkg/command/server.go @@ -54,7 +54,7 @@ func Server(cfg *config.Config) *cli.Command { } { - bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events)) + bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events)) if err != nil { return err } diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 559264156aa..1c6fb41c77f 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -75,7 +75,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type) } - bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{ + bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{ Endpoint: cfg.Events.Endpoint, Cluster: cfg.Events.Cluster, EnableTLS: cfg.Events.EnableTLS, diff --git a/services/sse/pkg/command/server.go b/services/sse/pkg/command/server.go index a88025e9461..ac027d606d2 100644 --- a/services/sse/pkg/command/server.go +++ b/services/sse/pkg/command/server.go @@ -59,7 +59,7 @@ func Server(cfg *config.Config) *cli.Command { } { - natsStream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + natsStream, err := stream.NatsFromConfig(cfg.Service.Name, true, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/storage-users/pkg/event/event.go b/services/storage-users/pkg/event/event.go index 957dc71f7b9..ef263085b35 100644 --- a/services/storage-users/pkg/event/event.go +++ b/services/storage-users/pkg/event/event.go @@ -8,7 +8,7 @@ import ( // NewStream prepares the requested nats stream and returns it. func NewStream(cfg *config.Config) (events.Stream, error) { - return stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{ + return stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{ Endpoint: cfg.Events.Addr, Cluster: cfg.Events.ClusterID, EnableTLS: cfg.Events.EnableTLS, diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index 0498cac5fb5..5529ef9537d 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -82,7 +82,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() - stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go index 35c4045d426..4162503d1db 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go @@ -228,7 +228,7 @@ func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) { if ok { tlsCert = val.(string) } - return stream.NatsFromConfig(m["name"].(string), stream.NatsConfig{ + return stream.NatsFromConfig(m["name"].(string), false, stream.NatsConfig{ Endpoint: m["address"].(string), Cluster: m["clusterID"].(string), EnableTLS: m["enable-tls"].(bool), diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/storageprovider/storageprovider.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/storageprovider/storageprovider.go index f7812783cf6..5b1274c0e4a 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/storageprovider/storageprovider.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/storageprovider/storageprovider.go @@ -1267,5 +1267,5 @@ func estreamFromConfig(c eventconfig) (events.Stream, error) { return nil, nil } - return stream.NatsFromConfig("storageprovider", stream.NatsConfig(c)) + return stream.NatsFromConfig("storageprovider", false, stream.NatsConfig(c)) } diff --git a/vendor/github.com/cs3org/reva/v2/internal/http/services/dataprovider/dataprovider.go b/vendor/github.com/cs3org/reva/v2/internal/http/services/dataprovider/dataprovider.go index 8c3a31195ff..cb930b95a73 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/http/services/dataprovider/dataprovider.go +++ b/vendor/github.com/cs3org/reva/v2/internal/http/services/dataprovider/dataprovider.go @@ -80,7 +80,7 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) if conf.NatsAddress == "" || conf.NatsClusterID == "" { log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.") } else { - s, err := stream.NatsFromConfig("dataprovider", stream.NatsConfig{ + s, err := stream.NatsFromConfig("dataprovider", false, stream.NatsConfig{ Endpoint: conf.NatsAddress, Cluster: conf.NatsClusterID, EnableTLS: conf.NatsEnableTLS, diff --git a/vendor/github.com/cs3org/reva/v2/pkg/events/stream/nats.go b/vendor/github.com/cs3org/reva/v2/pkg/events/stream/nats.go index f302822fc7a..4910d694e23 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/events/stream/nats.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/events/stream/nats.go @@ -25,7 +25,7 @@ type NatsConfig struct { } // NatsFromConfig returns a nats stream from the given config -func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) { +func NatsFromConfig(connName string, disableDurability bool, cfg NatsConfig) (events.Stream, error) { var tlsConf *tls.Config if cfg.EnableTLS { var rootCAPool *x509.CertPool @@ -48,13 +48,20 @@ func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) { RootCAs: rootCAPool, } } - return Nats( + + opts := []natsjs.Option{ natsjs.TLSConfig(tlsConf), natsjs.Address(cfg.Endpoint), natsjs.ClusterID(cfg.Cluster), natsjs.SynchronousPublish(true), natsjs.Name(connName), - ) + } + + if disableDurability { + opts = append(opts, natsjs.DisableDurableStreams()) + } + + return Nats(opts...) } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics/metrics.go b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics/metrics.go index f090a04c338..80ca36c2c13 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics/metrics.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics/metrics.go @@ -17,4 +17,44 @@ var ( Name: "reva_upload_active", Help: "Number of active uploads", }) + // UploadProcessing is the number of uploads in processing + UploadProcessing = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "reva_upload_processing", + Help: "Number of uploads in processing", + }) + // UploadSessionsInitiated is the number of upload sessions that have been initiated + UploadSessionsInitiated = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_initiated", + Help: "Number of uploads sessions that were initiated", + }) + // UploadSessionsBytesReceived is the number of upload sessions that have received all bytes + UploadSessionsBytesReceived = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_bytes_received", + Help: "Number of uploads sessions that have received all bytes", + }) + // UploadSessionsFinalized is the number of upload sessions that have received all bytes + UploadSessionsFinalized = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_finalized", + Help: "Number of uploads sessions that have successfully completed", + }) + // UploadSessionsAborted is the number of upload sessions that have been aborted + UploadSessionsAborted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_aborted", + Help: "Number of uploads sessions that have aborted by postprocessing", + }) + // UploadSessionsDeleted is the number of upload sessions that have been deleted + UploadSessionsDeleted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_deleted", + Help: "Number of uploads sessions that have been deleted by postprocessing", + }) + // UploadSessionsRestarted is the number of upload sessions that have been restarted + UploadSessionsRestarted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_restarted", + Help: "Number of uploads sessions that have been restarted by postprocessing", + }) + // UploadSessionsScanned is the number of upload sessions that have been scanned by antivirus + UploadSessionsScanned = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_scanned", + Help: "Number of uploads sessions that have been scanned by antivirus", + }) ) diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go index 220c34b3f6d..f0fa2cbd51b 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go @@ -175,7 +175,7 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { var es events.Stream if c.Events.Endpoint != "" { - es, err = stream.NatsFromConfig("jsoncs3-share-manager", stream.NatsConfig(c.Events)) + es, err = stream.NatsFromConfig("jsoncs3-share-manager", false, stream.NatsConfig(c.Events)) if err != nil { return nil, err } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go index 60662ee814b..23384d55364 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -42,6 +42,7 @@ import ( "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/cache" @@ -268,14 +269,18 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { case events.PPOutcomeAbort: failed = true keepUpload = true + metrics.UploadSessionsAborted.Inc() case events.PPOutcomeContinue: if err := up.Finalize(); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") keepUpload = true // should we keep the upload when assembling failed? failed = true + } else { + metrics.UploadSessionsFinalized.Inc() } case events.PPOutcomeDelete: failed = true + metrics.UploadSessionsDeleted.Inc() } getParent := func() *node.Node { @@ -344,6 +349,9 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") continue } + + metrics.UploadSessionsRestarted.Inc() + // restart postprocessing if err := events.Publish(ctx, fs.stream, events.BytesReceived{ UploadID: up.Info.ID, @@ -471,6 +479,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { continue } + metrics.UploadSessionsScanned.Inc() + // remove cache entry in gateway fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) default: diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go index 3eb29ea4d54..a5f9c2b1dfd 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go @@ -41,6 +41,7 @@ import ( ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/mime" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage/utils/ace" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" @@ -1220,6 +1221,9 @@ func (n *Node) FindStorageSpaceRoot(ctx context.Context) error { // UnmarkProcessing removes the processing flag from the node func (n *Node) UnmarkProcessing(ctx context.Context, uploadID string) error { + // we currently have to decrease the counter for every processing run to match the incrases + metrics.UploadProcessing.Sub(1) + v, _ := n.XattrString(ctx, prefixes.StatusPrefix) if v != ProcessingStatus+uploadID { // file started another postprocessing later - do not remove diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go index 59ef12a4e17..64fd575d143 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go @@ -34,6 +34,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" @@ -215,6 +216,8 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere info, _ = upload.GetInfo(ctx) + metrics.UploadSessionsInitiated.Inc() + return map[string]string{ "simple": info.ID, "tus": info.ID, diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go index c3e04460542..cb00376a5eb 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go @@ -39,6 +39,7 @@ import ( ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" @@ -265,6 +266,11 @@ func (upload *Upload) FinishUpload(_ context.Context) error { return err } + // increase the processing counter for every started processing + // will be decreased in Cleanup() + metrics.UploadProcessing.Inc() + metrics.UploadSessionsBytesReceived.Inc() + upload.Node = n if upload.pub != nil { @@ -295,6 +301,7 @@ func (upload *Upload) FinishUpload(_ context.Context) error { log.Error().Err(err).Msg("failed to upload") return err } + metrics.UploadSessionsFinalized.Inc() } return upload.tp.Propagate(upload.Ctx, n, upload.SizeDiff) diff --git a/vendor/github.com/go-micro/plugins/v4/events/natsjs/nats.go b/vendor/github.com/go-micro/plugins/v4/events/natsjs/nats.go index 0b4a984bc83..3deadfdeeb1 100644 --- a/vendor/github.com/go-micro/plugins/v4/events/natsjs/nats.go +++ b/vendor/github.com/go-micro/plugins/v4/events/natsjs/nats.go @@ -1,3 +1,4 @@ +// Package natsjs provides a NATS Jetstream implementation of the events.Stream interface. package natsjs import ( @@ -33,11 +34,14 @@ func NewStream(opts ...Option) (events.Stream, error) { } s := &stream{opts: options} + natsJetStreamCtx, err := connectToNatsJetStream(options) if err != nil { - return nil, fmt.Errorf("error connecting to nats cluster %v: %v", options.ClusterID, err) + return nil, fmt.Errorf("error connecting to nats cluster %v: %w", options.ClusterID, err) } + s.natsJetStreamCtx = natsJetStreamCtx + return s, nil } @@ -52,6 +56,7 @@ func connectToNatsJetStream(options Options) (nats.JetStreamContext, error) { nopts.Secure = true nopts.TLSConfig = options.TLSConfig } + if options.NkeyConfig != "" { nopts.Nkey = options.NkeyConfig } @@ -63,14 +68,21 @@ func connectToNatsJetStream(options Options) (nats.JetStreamContext, error) { if options.Name != "" { nopts.Name = options.Name } + + if options.Username != "" && options.Password != "" { + nopts.User = options.Username + nopts.Password = options.Password + } + conn, err := nopts.Connect() if err != nil { - return nil, fmt.Errorf("error connecting to nats at %v with tls enabled (%v): %v", options.Address, nopts.TLSConfig != nil, err) + tls := nopts.TLSConfig != nil + return nil, fmt.Errorf("error connecting to nats at %v with tls enabled (%v): %w", options.Address, tls, err) } js, err := conn.JetStream() if err != nil { - return nil, fmt.Errorf("error while obtaining JetStream context: %v", err) + return nil, fmt.Errorf("error while obtaining JetStream context: %w", err) } return js, nil @@ -125,6 +137,7 @@ func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOp if err != nil { err = errors.Wrap(err, "Error publishing message to topic") } + return err } @@ -154,14 +167,14 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve } // setup the subscriber - c := make(chan events.Event) - handleMsg := func(m *nats.Msg) { + channel := make(chan events.Event) + handleMsg := func(msg *nats.Msg) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() // decode the message var evt events.Event - if err := json.Unmarshal(m.Data, &evt); err != nil { + if err := json.Unmarshal(msg.Data, &evt); err != nil { log.Logf(logger.ErrorLevel, "Error decoding message: %v", err) // not acknowledging the message is the way to indicate an error occurred return @@ -170,23 +183,23 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve if !options.AutoAck { // set up the ack funcs evt.SetAckFunc(func() error { - return m.Ack() + return msg.Ack() }) evt.SetNackFunc(func() error { - return m.Nak() + return msg.Nak() }) } // push onto the channel and wait for the consumer to take the event off before we acknowledge it. - c <- evt + channel <- evt if !options.AutoAck { return } - if err := m.Ack(nats.Context(ctx)); err != nil { + + if err := msg.Ack(nats.Context(ctx)); err != nil { log.Logf(logger.ErrorLevel, "Error acknowledging message: %v", err) } - } // ensure that a stream exists for that topic @@ -203,9 +216,7 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve } // setup the options - subOpts := []nats.SubOpt{ - nats.Durable(options.Group), - } + subOpts := []nats.SubOpt{} if options.CustomRetries { subOpts = append(subOpts, nats.MaxDeliver(options.GetRetryLimit())) @@ -227,11 +238,18 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve subOpts = append(subOpts, nats.AckWait(options.AckWait)) } - // connect the subscriber - _, err = s.natsJetStreamCtx.QueueSubscribe(topic, options.Group, handleMsg, subOpts...) + // connect the subscriber via a queue group only if durable streams are enabled + if !s.opts.DisableDurableStreams { + subOpts = append(subOpts, nats.Durable(options.Group)) + _, err = s.natsJetStreamCtx.QueueSubscribe(topic, options.Group, handleMsg, subOpts...) + } else { + subOpts = append(subOpts, nats.ConsumerName(options.Group)) + _, err = s.natsJetStreamCtx.Subscribe(topic, handleMsg, subOpts...) + } + if err != nil { return nil, errors.Wrap(err, "Error subscribing to topic") } - return c, nil + return channel, nil } diff --git a/vendor/github.com/go-micro/plugins/v4/events/natsjs/options.go b/vendor/github.com/go-micro/plugins/v4/events/natsjs/options.go index 8418628ad19..f7e83d5f1fe 100644 --- a/vendor/github.com/go-micro/plugins/v4/events/natsjs/options.go +++ b/vendor/github.com/go-micro/plugins/v4/events/natsjs/options.go @@ -8,14 +8,17 @@ import ( // Options which are used to configure the nats stream. type Options struct { - ClusterID string - ClientID string - Address string - NkeyConfig string - TLSConfig *tls.Config - Logger logger.Logger - SyncPublish bool - Name string + ClusterID string + ClientID string + Address string + NkeyConfig string + TLSConfig *tls.Config + Logger logger.Logger + SyncPublish bool + Name string + DisableDurableStreams bool + Username string + Password string } // Option is a function which configures options. @@ -49,30 +52,45 @@ func TLSConfig(t *tls.Config) Option { } } -// Nkey string to use when connecting to the cluster. +// NkeyConfig string to use when connecting to the cluster. func NkeyConfig(nkey string) Option { return func(o *Options) { o.NkeyConfig = nkey } } -// Logger sets the underlyin logger +// Logger sets the underlying logger. func Logger(log logger.Logger) Option { return func(o *Options) { o.Logger = log } } -// SynchronousPublish allows using a synchronous publishing instead of the default asynchronous +// SynchronousPublish allows using a synchronous publishing instead of the default asynchronous. func SynchronousPublish(sync bool) Option { return func(o *Options) { o.SyncPublish = sync } } -// Name allows to add a name to the natsjs connection +// Name allows to add a name to the natsjs connection. func Name(name string) Option { return func(o *Options) { o.Name = name } } + +// DisableDurableStreams will disable durable streams. +func DisableDurableStreams() Option { + return func(o *Options) { + o.DisableDurableStreams = true + } +} + +// Authenticate authenticates the connection with the given username and password. +func Authenticate(username, password string) Option { + return func(o *Options) { + o.Username = username + o.Password = password + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d112330434c..14164290615 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -362,7 +362,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.17.1-0.20231215155002-7810e3d8be38 +# github.com/cs3org/reva/v2 v2.17.1-0.20231218091701-6a3a91e35514 ## explicit; go 1.21 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime @@ -904,7 +904,7 @@ github.com/go-logr/stdr # github.com/go-micro/plugins/v4/client/grpc v1.2.1 ## explicit; go 1.17 github.com/go-micro/plugins/v4/client/grpc -# github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 +# github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9 ## explicit; go 1.17 github.com/go-micro/plugins/v4/events/natsjs # github.com/go-micro/plugins/v4/logger/zerolog v1.2.0