Skip to content

Commit

Permalink
make sse streams non-durable
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Dec 18, 2023
1 parent 96c0e71 commit 28028cc
Show file tree
Hide file tree
Showing 16 changed files with 20 additions and 14 deletions.
1 change: 1 addition & 0 deletions changelog/unreleased/bump-reva.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ https://github.com/owncloud/ocis/pull/7793
https://github.com/owncloud/ocis/pull/7978
https://github.com/owncloud/ocis/pull/7979
https://github.com/owncloud/ocis/pull/7963
https://github.com/owncloud/ocis/pull/7986
5 changes: 5 additions & 0 deletions changelog/unreleased/sse-non-durable-streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Non durable streams for sse service

Configure sse streams to be non-durable. This functionality is not needed for the sse service

https://github.com/owncloud/ocis/pull/7986
2 changes: 1 addition & 1 deletion services/antivirus/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (av Antivirus) Run() error {
evtsCfg.TLSInsecure = false
}

natsStream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events))
natsStream, err := stream.NatsFromConfig(av.c.Service.Name, false, stream.NatsConfig(av.c.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/audit/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Server(cfg *config.Config) *cli.Command {
)
defer cancel()

client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/clientlog/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Server(cfg *config.Config) *cli.Command {

defer cancel()

stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/eventhistory/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Server(cfg *config.Config) *cli.Command {

metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1)

consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
consumer, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/frontend/pkg/command/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _registeredEvents = []events.Unmarshaller{

// ListenForEvents listens for events and acts accordingly
func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) error {
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
if err != nil {
l.Error().Err(err).Msg("cannot connect to nats")
return err
Expand Down
2 changes: 1 addition & 1 deletion services/graph/pkg/server/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Server(opts ...Option) (http.Service, error) {

if options.Config.Events.Endpoint != "" {
var err error
publisher, err = stream.NatsFromConfig(options.Config.Service.Name, stream.NatsConfig(options.Config.Events))
publisher, err = stream.NatsFromConfig(options.Config.Service.Name, false, stream.NatsConfig(options.Config.Events))
if err != nil {
options.Logger.Error().
Err(err).
Expand Down
2 changes: 1 addition & 1 deletion services/notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Server(cfg *config.Config) *cli.Command {
events.SpaceUnshared{},
events.SpaceMembershipExpired{},
}
client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Notifications.Events))
client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/policies/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func Server(cfg *config.Config) *cli.Command {

{

bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/postprocessing/pkg/command/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events))
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/postprocessing/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func Server(cfg *config.Config) *cli.Command {
}

{
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events))
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/search/pkg/service/grpc/v0/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}

bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
Expand Down
2 changes: 1 addition & 1 deletion services/sse/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Server(cfg *config.Config) *cli.Command {
}

{
natsStream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
natsStream, err := stream.NatsFromConfig(cfg.Service.Name, true, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/storage-users/pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// NewStream prepares the requested nats stream and returns it.
func NewStream(cfg *config.Config) (events.Stream, error) {
return stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{
return stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
Endpoint: cfg.Events.Addr,
Cluster: cfg.Events.ClusterID,
EnableTLS: cfg.Events.EnableTLS,
Expand Down
2 changes: 1 addition & 1 deletion services/userlog/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func Server(cfg *config.Config) *cli.Command {

defer cancel()

stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down

0 comments on commit 28028cc

Please sign in to comment.