Skip to content

Commit

Permalink
feat(notification): add rate-limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Carlier authored and ncarlier committed Jan 29, 2022
1 parent e00d697 commit 6a75c59
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 42 deletions.
4 changes: 4 additions & 0 deletions etc/default/readflow.env
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ READFLOW_LOG_LEVEL="info"
# Output human readable logs, default is "false"
READFLOW_LOG_PRETTY=false

# Rate limiting for notifications
# Default: "memory://?tokens=5&interval=24h"
#READFLOW_NOTIFICATION_RATE_LIMITING=

# Pocket consumer key, deactivated by default
READFLOW_POCKET_CONSUMER_KEY=

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/minio/minio-go/v7 v7.0.18
github.com/prometheus/client_golang v1.11.0
github.com/rs/zerolog v1.26.1
github.com/sethvargo/go-limiter v0.7.2
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/speps/go-hashids/v2 v2.0.1
github.com/stretchr/testify v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/sanity-io/litter v1.2.0/go.mod h1:JF6pZUFgu2Q0sBZ+HSV35P8TVPI1TTzEwyu9FXAw2W4=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sethvargo/go-limiter v0.7.2 h1:FgC4N7RMpV5gMrUdda15FaFTkQ/L4fEqM7seXMs4oO8=
github.com/sethvargo/go-limiter v0.7.2/go.mod h1:C0kbSFbiriE5k2FFOe18M1YZbAR2Fiwf72uGu0CXCcU=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
29 changes: 15 additions & 14 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package config

// Config contain global configuration
type Config struct {
AuthN string `flag:"authn" desc:"Authentication method (\"mock\", \"proxy\" or OIDC if URL)" default:"https://login.nunux.org/auth/realms/readflow"`
Broker string `flag:"broker" desc:"External event broker URI for outgoing events"`
DB string `flag:"db" desc:"Database connection string" default:"postgres://postgres:testpwd@localhost/readflow_test?sslmode=disable"`
ImageProxy string `flag:"image-proxy" desc:"Image proxy service (passthrough if empty)"`
ListenAddr string `flag:"listen-addr" desc:"HTTP listen address" default:":8080"`
ListenMetricsAddr string `flag:"listen-metrics" desc:"Metrics listen address"`
LogLevel string `flag:"log-level" desc:"Log level (debug, info, warn, error)" default:"info"`
LogPretty bool `flag:"log-pretty" desc:"Output human readable logs" default:"false"`
PocketConsumerKey string `flag:"pocket-consumer-key" desc:"Pocket consumer key"`
PublicURL string `flag:"public-url" desc:"Public URL" default:"https://api.readflow.app"`
SecretSalt string `flag:"secret-salt" desc:"Secret salt used by hash algorithms" default:"pepper"`
SentryDSN string `flag:"sentry-dsn" desc:"Sentry DSN URL"`
UserPlans string `flag:"user-plans" desc:"User plans definition file (deactivated if empty)"`
WebScraping string `flag:"web-scraping" desc:"Web Scraping service (internal if empty)"`
AuthN string `flag:"authn" desc:"Authentication method (\"mock\", \"proxy\" or OIDC if URL)" default:"https://login.nunux.org/auth/realms/readflow"`
Broker string `flag:"broker" desc:"External event broker URI for outgoing events"`
DB string `flag:"db" desc:"Database connection string" default:"postgres://postgres:testpwd@localhost/readflow_test?sslmode=disable"`
ImageProxy string `flag:"image-proxy" desc:"Image proxy service (passthrough if empty)"`
ListenAddr string `flag:"listen-addr" desc:"HTTP listen address" default:":8080"`
ListenMetricsAddr string `flag:"listen-metrics" desc:"Metrics listen address"`
LogLevel string `flag:"log-level" desc:"Log level (debug, info, warn, error)" default:"info"`
LogPretty bool `flag:"log-pretty" desc:"Output human readable logs" default:"false"`
NotificationRateLimiting string `flag:"notification-rate-limiting" desc:"Rate limiting configuration for notifications" default:"memory://?tokens=5&interval=24h"`
PocketConsumerKey string `flag:"pocket-consumer-key" desc:"Pocket consumer key"`
PublicURL string `flag:"public-url" desc:"Public URL" default:"https://api.readflow.app"`
SecretSalt string `flag:"secret-salt" desc:"Secret salt used by hash algorithms" default:"pepper"`
SentryDSN string `flag:"sentry-dsn" desc:"Sentry DSN URL"`
UserPlans string `flag:"user-plans" desc:"User plans definition file (deactivated if empty)"`
WebScraping string `flag:"web-scraping" desc:"Web Scraping service (internal if empty)"`
}
10 changes: 6 additions & 4 deletions pkg/event-listener/article-notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (
"github.com/rs/zerolog/log"
)

const errorMessage = "unable to send notification"
const (
errorMessage = "unable to send notification"
maxUserInactivityBeforeNotification = 6 * time.Hour
)

var status string = "inbox"

Expand Down Expand Up @@ -58,9 +61,8 @@ func init() {
}

if globalStrategy {
// Send notification only if user logged in more than 5 minutes ago
// TODO use rate limiter instead of this
lastLoginDelay := time.Now().Add(-5 * time.Minute)
// Send notification only if user is inactive for a while
lastLoginDelay := time.Now().Add(-maxUserInactivityBeforeNotification)
if user.Enabled && user.LastLoginAt != nil && user.LastLoginAt.After(lastLoginDelay) {
return
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/rate-limiter/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ratelimiter

import (
"context"
"fmt"
"net/url"
"strconv"
"time"

"github.com/rs/zerolog/log"
"github.com/sethvargo/go-limiter/memorystore"
)

// RateLimiter is an interface use to apply rate limiting
type RateLimiter interface {
Take(ctx context.Context, key string) (tokens, remaining, reset uint64, ok bool, err error)
Close(ctx context.Context) error
}

// NewWebScraper create new Web Scraping service
func NewRateLimiter(uri string) (RateLimiter, error) {
if uri == "" {
return nil, nil
}
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("invalid configuration URI: %s", uri)
}

tokens := uint64(1)
if val := u.Query().Get("tokens"); val != "" {
tokens, _ = strconv.ParseUint(string(val), 10, 64)
}
interval := time.Hour
if val := u.Query().Get("interval"); val != "" {
interval, _ = time.ParseDuration(val)
}

switch u.Scheme {
case "memory":
store, err := memorystore.New(&memorystore.Config{
// Number of tokens allowed per interval.
Tokens: tokens,
// Interval until tokens reset.
Interval: interval,
})
if err != nil {
return nil, err
}
log.Info().Str("component", "rate-limiter").Str("uri", u.Redacted()).Msg("using in memory rate limiter")
return store, nil
default:
return nil, fmt.Errorf("unsupported rate limiter: %s", u.Scheme)
}
}
25 changes: 20 additions & 5 deletions pkg/service/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/ncarlier/readflow/pkg/model"
)

const errNotification = "unable to notify user's devices"

// GetDevices get devices from current user
func (reg *Registry) GetDevices(ctx context.Context) (*[]model.Device, error) {
uid := getCurrentUserIDFromContext(ctx)
Expand Down Expand Up @@ -99,7 +101,7 @@ func (reg *Registry) DeleteDevices(ctx context.Context, ids []uint) (int64, erro
if err != nil {
reg.logger.Info().Err(err).Uint(
"uid", uid,
).Str("ids", idsStr).Msg("unable to delete devices")
).Str("ids", idsStr).Msg(errNotification)
return 0, err
}
reg.logger.Debug().Err(err).Uint(
Expand All @@ -112,7 +114,7 @@ func (reg *Registry) DeleteDevices(ctx context.Context, ids []uint) (int64, erro
func (reg *Registry) NotifyDevices(ctx context.Context, msg string) (int, error) {
user, err := reg.GetCurrentUser(ctx)
if err != nil {
reg.logger.Info().Err(err).Msg("unable to notify devices")
reg.logger.Info().Err(err).Msg(errNotification)
return 0, err
}
uid := *user.ID
Expand All @@ -121,11 +123,22 @@ func (reg *Registry) NotifyDevices(ctx context.Context, msg string) (int, error)
if err != nil {
reg.logger.Info().Err(err).Uint(
"uid", uid,
).Msg("unable to notify devices")
).Msg(errNotification)
return 0, err
}
counter := 0
for _, device := range *devices {
// Rate limiting
if _, _, _, ok, err := reg.notificationRateLimiter.Take(ctx, user.Username); err != nil || !ok {
if !ok {
err = errors.New("rate limiting activated")
}
reg.logger.Info().Err(err).Uint(
"uid", uid,
).Uint("device", *device.ID).Msg(errNotification)
continue
}
// Send notification
res, err := webpush.SendNotification([]byte(msg), device.Subscription, &webpush.Options{
Subscriber: user.Username,
VAPIDPublicKey: reg.properties.VAPIDPublicKey,
Expand All @@ -135,7 +148,7 @@ func (reg *Registry) NotifyDevices(ctx context.Context, msg string) (int, error)
if err != nil {
reg.logger.Info().Err(err).Uint(
"uid", uid,
).Uint("device", *device.ID).Msg("unable to notify user device")
).Uint("device", *device.ID).Msg(errNotification)
continue
}
if res.StatusCode == 410 {
Expand All @@ -149,7 +162,9 @@ func (reg *Registry) NotifyDevices(ctx context.Context, msg string) (int, error)
if res.StatusCode >= 400 {
reg.logger.Info().Err(errors.New(res.Status)).Uint(
"uid", uid,
).Uint("device", *device.ID).Msg("unable to send notification to user device")
).Uint(
"device", *device.ID,
).Int("status", res.StatusCode).Msg(errNotification)
continue
}
counter++
Expand Down
45 changes: 26 additions & 19 deletions pkg/service/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/ncarlier/readflow/pkg/exporter/all"
"github.com/ncarlier/readflow/pkg/helper"
"github.com/ncarlier/readflow/pkg/model"
ratelimiter "github.com/ncarlier/readflow/pkg/rate-limiter"
ruleengine "github.com/ncarlier/readflow/pkg/rule-engine"
"github.com/ncarlier/readflow/pkg/scraper"
userplan "github.com/ncarlier/readflow/pkg/user-plan"
Expand All @@ -20,16 +21,17 @@ var instance *Registry

// Registry is the structure definition of the service registry
type Registry struct {
conf config.Config
db db.DB
UserPlans userplan.UserPlans
logger zerolog.Logger
ruleEngineCache *ruleengine.Cache
downloadCache cache.Cache
properties *model.Properties
webScraper scraper.WebScraper
downloader exporter.Downloader
hashid *helper.HashIDHandler
conf config.Config
db db.DB
UserPlans userplan.UserPlans
logger zerolog.Logger
ruleEngineCache *ruleengine.Cache
downloadCache cache.Cache
properties *model.Properties
webScraper scraper.WebScraper
downloader exporter.Downloader
hashid *helper.HashIDHandler
notificationRateLimiter ratelimiter.RateLimiter
}

// Configure the global service registry
Expand All @@ -43,16 +45,21 @@ func Configure(conf config.Config, database db.DB, downloadCache cache.Cache, pl
if err != nil {
return err
}
notificationRateLimiter, err := ratelimiter.NewRateLimiter(conf.NotificationRateLimiting)
if err != nil {
return err
}
instance = &Registry{
conf: conf,
db: database,
UserPlans: plans,
logger: log.With().Str("component", "service").Logger(),
ruleEngineCache: ruleengine.NewRuleEngineCache(1024),
downloadCache: downloadCache,
webScraper: webScraper,
downloader: downloader,
hashid: hashid,
conf: conf,
db: database,
UserPlans: plans,
logger: log.With().Str("component", "service").Logger(),
ruleEngineCache: ruleengine.NewRuleEngineCache(1024),
downloadCache: downloadCache,
webScraper: webScraper,
downloader: downloader,
hashid: hashid,
notificationRateLimiter: notificationRateLimiter,
}
return instance.initProperties()
}
Expand Down

0 comments on commit 6a75c59

Please sign in to comment.