Skip to content

Commit

Permalink
Merge pull request #3217 from owncloud/notifications-service
Browse files Browse the repository at this point in the history
Enh: First version of the notifications service
  • Loading branch information
David Christofas authored Feb 24, 2022
2 parents 4bc85c9 + e160615 commit 8b2ee7a
Show file tree
Hide file tree
Showing 19 changed files with 523 additions and 0 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/notifications-service.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Implement notifications service

Implemented the minimal version of the notifications service to be able to notify a user when they received a share.

https://github.com/owncloud/ocis/pull/3217
14 changes: 14 additions & 0 deletions notifications/cmd/notifications/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"os"

"github.com/owncloud/ocis/notifications/pkg/command"
"github.com/owncloud/ocis/notifications/pkg/config"
)

func main() {
if err := command.Execute(config.DefaultConfig()); err != nil {
os.Exit(1)
}
}
105 changes: 105 additions & 0 deletions notifications/pkg/channels/channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Package channels provides different communication channels to notify users.
package channels

import (
"context"
"net/smtp"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
groups "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/owncloud/ocis/ocis-pkg/log"
"github.com/pkg/errors"
)

// Channel defines the methods of a communication channel.
type Channel interface {
// SendMessage sends a message to users.
SendMessage(userIDs []string, msg string) error
// SendMessageToGroup sends a message to a group.
SendMessageToGroup(groupdID *groups.GroupId, msg string) error
}

// NewMailChannel instantiates a new mail communication channel.
func NewMailChannel(cfg config.Config, logger log.Logger) (Channel, error) {
gc, err := pool.GetGatewayServiceClient(cfg.Notifications.RevaGateway)
if err != nil {
logger.Error().Err(err).Msg("could not get gateway client")
return nil, err
}
return Mail{
gatewayClient: gc,
conf: cfg,
logger: logger,
}, nil
}

// Mail is the communcation channel for email.
type Mail struct {
gatewayClient gateway.GatewayAPIClient
conf config.Config
logger log.Logger
}

// SendMessage sends a message to all given users.
func (m Mail) SendMessage(userIDs []string, msg string) error {
to, err := m.getReceiverAddresses(userIDs)
if err != nil {
return err
}
body := []byte(msg)

smtpConf := m.conf.Notifications.SMTP
auth := smtp.PlainAuth("", smtpConf.Sender, smtpConf.Password, smtpConf.Host)
if err := smtp.SendMail(smtpConf.Host+":"+smtpConf.Port, auth, smtpConf.Sender, to, body); err != nil {
return errors.Wrap(err, "could not send mail")
}
return nil
}

// SendMessageToGroup sends a message to all members of the given group.
func (m Mail) SendMessageToGroup(groupID *groups.GroupId, msg string) error {
// TODO We need an authenticated context here...
res, err := m.gatewayClient.GetGroup(context.Background(), &groups.GetGroupRequest{GroupId: groupID})
if err != nil {
return err
}
if res.Status.Code != rpc.Code_CODE_OK {
return errors.New("could not get group")
}

members := make([]string, 0, len(res.Group.Members))
for _, id := range res.Group.Members {
members = append(members, id.OpaqueId)
}

return m.SendMessage(members, msg)
}

func (m Mail) getReceiverAddresses(receivers []string) ([]string, error) {
addresses := make([]string, 0, len(receivers))
for _, id := range receivers {
// Authenticate is too costly but at the moment our only option to get the user.
// We don't have an authenticated context so calling `GetUser` doesn't work.
res, err := m.gatewayClient.Authenticate(context.Background(), &gateway.AuthenticateRequest{
Type: "machine",
ClientId: "userid:" + id,
ClientSecret: m.conf.Notifications.MachineAuthSecret,
})
if err != nil {
return nil, err
}
if res.Status.Code != rpc.Code_CODE_OK {
m.logger.Error().
Interface("status", res.Status).
Str("receiver_id", id).
Msg("could not get user")
continue
}
addresses = append(addresses, res.User.Mail)
}

return addresses, nil
}
18 changes: 18 additions & 0 deletions notifications/pkg/command/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package command

import (
"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/urfave/cli/v2"
)

// Health is the entrypoint for the health command.
func Health(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "health",
Usage: "Check health status",
Action: func(c *cli.Context) error {
// Not implemented
return nil
},
}
}
64 changes: 64 additions & 0 deletions notifications/pkg/command/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package command

import (
"context"
"os"

"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/owncloud/ocis/ocis-pkg/clihelper"
ociscfg "github.com/owncloud/ocis/ocis-pkg/config"
"github.com/thejerf/suture/v4"
"github.com/urfave/cli/v2"
)

// GetCommands provides all commands for this service
func GetCommands(cfg *config.Config) cli.Commands {
return []*cli.Command{
// start this service
Server(cfg),

// interaction with this service

// infos about this service
Health(cfg),
Version(cfg),
}
}

// Execute is the entry point for the notifications command.
func Execute(cfg *config.Config) error {
app := clihelper.DefaultApp(&cli.App{
Name: "notifications",
Usage: "starts notifications service",
Commands: GetCommands(cfg),
})

cli.HelpFlag = &cli.BoolFlag{
Name: "help,h",
Usage: "Show the help",
}

return app.Run(os.Args)
}

// SutureService allows for the notifications command to be embedded and supervised by a suture supervisor tree.
type SutureService struct {
cfg *config.Config
}

// NewSutureService creates a new notifications.SutureService
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Settings.Commons = cfg.Commons
return SutureService{
cfg: cfg.Notifications,
}
}

func (s SutureService) Serve(ctx context.Context) error {
s.cfg.Context = ctx
if err := Execute(s.cfg); err != nil {
return err
}

return nil
}
50 changes: 50 additions & 0 deletions notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package command

import (
"fmt"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/cs3org/reva/pkg/events"
"github.com/cs3org/reva/pkg/events/server"
"github.com/owncloud/ocis/notifications/pkg/channels"
"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/owncloud/ocis/notifications/pkg/config/parser"
"github.com/owncloud/ocis/notifications/pkg/logging"
"github.com/owncloud/ocis/notifications/pkg/service"
"github.com/urfave/cli/v2"
)

// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "server",
Usage: fmt.Sprintf("start %s extension without runtime (unsupervised mode)", cfg.Service.Name),
Category: "server",
Before: func(c *cli.Context) error {
return parser.ParseConfig(cfg)
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)

evs := []events.Unmarshaller{
events.ShareCreated{},
}

evtsCfg := cfg.Notifications.Events
client, err := server.NewNatsStream(nats.Address(evtsCfg.Endpoint), nats.ClusterID(evtsCfg.Cluster))
if err != nil {
return err
}
evts, err := events.Consume(client, evtsCfg.ConsumerGroup, evs...)
if err != nil {
return err
}
channel, err := channels.NewMailChannel(*cfg, logger)
if err != nil {
return err
}
svc := service.NewEventsNotifier(evts, channel, logger)
return svc.Run()
},
}
}
19 changes: 19 additions & 0 deletions notifications/pkg/command/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package command

import (
"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/urfave/cli/v2"
)

// Version prints the service versions of all running instances.
func Version(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "version",
Usage: "print the version of this binary and the running extension instances",
Category: "info",
Action: func(c *cli.Context) error {
// not implemented
return nil
},
}
}
44 changes: 44 additions & 0 deletions notifications/pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package config

import (
"context"

"github.com/owncloud/ocis/ocis-pkg/shared"
)

// Config combines all available configuration parts.
type Config struct {
*shared.Commons

Service Service

Log *Log `ocisConfig:"log"`
Debug Debug `ocisConfig:"debug"`

Notifications Notifications `ocisConfig:"notifications"`

Context context.Context
}

// Notifications definces the config options for the notifications service.
type Notifications struct {
SMTP SMTP `ocisConfig:"SMTP"`
Events Events `ocisConfig:"events"`
RevaGateway string `ocisConfig:"reva_gateway" env:"REVA_GATEWAY;NOTIFICATIONS_REVA_GATEWAY"`
MachineAuthSecret string `ocisConfig:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;NOTIFICATIONS_MACHINE_AUTH_API_KEY"`
}

// SMTP combines the smtp configuration options.
type SMTP struct {
Host string `ocisConfig:"smtp_host" env:"NOTIFICATIONS_SMTP_HOST"`
Port string `ocisConfig:"smtp_port" env:"NOTIFICATIONS_SMTP_PORT"`
Sender string `ocisConfig:"smtp_sender" env:"NOTIFICATIONS_SMTP_SENDER"`
Password string `ocisConfig:"smtp_password" env:"NOTIFICATIONS_SMTP_PASSWORD"`
}

// Events combines the configuration options for the event bus.
type Events struct {
Endpoint string `ocisConfig:"events_endpoint" env:"NOTIFICATIONS_EVENTS_ENDPOINT"`
Cluster string `ocisConfig:"events_cluster" env:"NOTIFICATIONS_EVENTS_CLUSTER"`
ConsumerGroup string `ocisConfig:"events_group" env:"NOTIFICATIONS_EVENTS_GROUP"`
}
9 changes: 9 additions & 0 deletions notifications/pkg/config/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package config

// Debug defines the available debug configuration.
type Debug struct {
Addr string `ocisConfig:"addr" env:"NOTIFICATIONS_DEBUG_ADDR"`
Token string `ocisConfig:"token" env:"NOTIFICATIONS_DEBUG_TOKEN"`
Pprof bool `ocisConfig:"pprof" env:"NOTIFICATIONS_DEBUG_PPROF"`
Zpages bool `ocisConfig:"zpages" env:"NOTIFICATIONS_DEBUG_ZPAGES"`
}
27 changes: 27 additions & 0 deletions notifications/pkg/config/defaultconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package config

// NOTE: Most of this configuration is not needed to keep it as simple as possible
// TODO: Clean up unneeded configuration

func DefaultConfig() *Config {
return &Config{
Service: Service{
Name: "notifications",
},
Notifications: Notifications{
SMTP: SMTP{
Host: "127.0.0.1",
Port: "1025",
Sender: "[email protected]",
Password: "godisdead",
},
Events: Events{
Endpoint: "127.0.0.1:4222",
Cluster: "test-cluster",
ConsumerGroup: "notifications",
},
RevaGateway: "127.0.0.1:9142",
MachineAuthSecret: "change-me-please",
},
}
}
9 changes: 9 additions & 0 deletions notifications/pkg/config/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package config

// Log defines the available log configuration.
type Log struct {
Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;NOTIFICATIONS_LOG_LEVEL"`
Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;NOTIFICATIONS_LOG_PRETTY"`
Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;NOTIFICATIONS_LOG_COLOR"`
File string `mapstructure:"file" env:"OCIS_LOG_FILE;NOTIFICATIONS_LOG_FILE"`
}
Loading

0 comments on commit 8b2ee7a

Please sign in to comment.