Skip to content

Commit

Permalink
Merge pull request #457 from alessandro-sorint/webhooks-cleaner
Browse files Browse the repository at this point in the history
notification: delete old run webhooks
  • Loading branch information
sgotti authored Nov 29, 2023
2 parents 0a2737f + b11e838 commit 3af7f7a
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 28 deletions.
5 changes: 5 additions & 0 deletions internal/services/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type Notification struct {

WebhookURL string `yaml:"webhookURL"`
WebhookSecret string `yaml:"webhookSecret"`

RunWebhookExpireInterval time.Duration `yaml:"runWebhookExpireInterval"`
}

type Runservice struct {
Expand Down Expand Up @@ -358,6 +360,9 @@ var defaultConfig = func() *Config {
RepositoryCleanupInterval: 24 * time.Hour,
RepositoryRefsExpireInterval: 30 * 24 * time.Hour,
},
Notification: Notification{
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
}
}

Expand Down
62 changes: 34 additions & 28 deletions internal/services/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ gitserver:
},
Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"},
Notification: Notification{
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down Expand Up @@ -223,9 +224,10 @@ gitserver:
},
Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"},
Notification: Notification{
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down Expand Up @@ -309,9 +311,10 @@ gitserver:
},
Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"},
Notification: Notification{
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
RunCacheExpireInterval: 7 * 24 * time.Hour,
Expand Down Expand Up @@ -448,9 +451,10 @@ gitserver:
},
Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"},
Notification: Notification{
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down Expand Up @@ -593,14 +597,15 @@ gitserver:
RunserviceAPIToken: "internalservicesapitoken",
},
Notification: Notification{
DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"},
Web: Web{ListenAddress: ":4004"},
APIToken: "internalservicesapitoken",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
RunserviceAPIToken: "internalservicesapitoken",
ConfigstoreURL: "http://localhost:4002",
ConfigstoreAPIToken: "internalservicesapitoken",
DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"},
Web: Web{ListenAddress: ":4004"},
APIToken: "internalservicesapitoken",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
RunserviceAPIToken: "internalservicesapitoken",
ConfigstoreURL: "http://localhost:4002",
ConfigstoreAPIToken: "internalservicesapitoken",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down Expand Up @@ -754,14 +759,15 @@ gitserver:
RunserviceAPIToken: "runserviceapitoken",
},
Notification: Notification{
DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"},
Web: Web{ListenAddress: ":4004"},
APIToken: "notificationapitoken",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
RunserviceAPIToken: "runserviceapitoken",
ConfigstoreURL: "http://localhost:4002",
ConfigstoreAPIToken: "configstoreapitoken",
DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"},
Web: Web{ListenAddress: ":4004"},
APIToken: "notificationapitoken",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
RunserviceAPIToken: "runserviceapitoken",
ConfigstoreURL: "http://localhost:4002",
ConfigstoreAPIToken: "configstoreapitoken",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down
29 changes: 29 additions & 0 deletions internal/services/notification/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,23 @@ func (d *DB) GetRunWebhooks(tx *sql.Tx, limit int) ([]*types.RunWebhook, error)
return runWebhooks, errors.WithStack(err)
}

func (d *DB) GetRunWebhooksAfterRunWebhookID(tx *sql.Tx, afterRunWebhookID string, limit int) ([]*types.RunWebhook, error) {
q := runWebhookSelect().OrderBy("id")
if afterRunWebhookID != "" {
q.Where(q.G("id", afterRunWebhookID))
}

if limit > 0 {
q.Limit(limit)
}
runWebhooks, _, err := d.fetchRunWebhooks(tx, q)
if err != nil {
return nil, errors.WithStack(err)
}

return runWebhooks, errors.WithStack(err)
}

func (d *DB) GetLastRunEventSequence(tx *sql.Tx) (*types.LastRunEventSequence, error) {
q := lastRunEventSequenceSelect()
lastRunEventSequences, _, err := d.fetchLastRunEventSequences(tx, q)
Expand Down Expand Up @@ -248,3 +265,15 @@ func (d *DB) GetCommitStatuses(tx *sql.Tx, limit int) ([]*types.CommitStatus, er

return commitStatuses, errors.WithStack(err)
}

func (d *DB) DeleteRunWebhookDeliveriesByRunWebhookID(tx *sql.Tx, runWebhookID string) error {
q := sq.NewDeleteBuilder()
q.DeleteFrom("runwebhookdelivery")
q.Where(q.E("run_webhook_id", runWebhookID))

if _, err := d.exec(tx, q); err != nil {
return errors.Wrap(err, "failed to delete runWebhookdeliveries")
}

return nil
}
1 change: 1 addition & 0 deletions internal/services/notification/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (n *NotificationService) run(ctx context.Context) error {
util.GoWait(&wg, func() { n.runEventsHandlerLoop(ctx) })
util.GoWait(&wg, func() { n.RunWebhookDeliveriesHandlerLoop(ctx) })
util.GoWait(&wg, func() { n.CommitStatusDeliveriesHandlerLoop(ctx) })
util.GoWait(&wg, func() { n.runWebhooksCleanerLoop(ctx, n.c.RunWebhookExpireInterval) })

mainrouter := n.setupDefaultRouter()
httpServer := http.Server{
Expand Down
86 changes: 86 additions & 0 deletions internal/services/notification/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,26 @@ func createRunWebhookDelivery(t *testing.T, ctx context.Context, ns *Notificatio
return wd
}

func updateRunWebhookCreationDate(t *testing.T, ctx context.Context, ns *NotificationService, runWebhookID string, creationTime time.Time) {
err := ns.d.Do(ctx, func(tx *sql.Tx) error {
var err error
runWebhook, err := ns.d.GetRunWebhookByID(tx, runWebhookID)
if err != nil {
return errors.WithStack(err)
}

runWebhook.CreationTime = creationTime
if err := ns.d.UpdateRunWebhook(tx, runWebhook); err != nil {
return errors.WithStack(err)
}

return nil
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}

func TestCommitStatusDelivery(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -711,6 +731,72 @@ func TestDeliveryStatusFromStringSlice(t *testing.T) {
}
}

func TestRunWebhooksCleaner(t *testing.T) {
t.Parallel()

dir := t.TempDir()
log := testutil.NewLogger(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ns := setupNotificationService(ctx, t, log, dir)

t.Logf("starting ns")

expectedRunWebhooks := make([]*types.RunWebhook, 0)
expectedRunWebhookDeliveries := make([]*types.RunWebhookDelivery, 0)

for i := 0; i < 5; i++ {
runWebhook := createRunWebhook(t, ctx, ns, project01)
expectedRunWebhooks = append(expectedRunWebhooks, runWebhook)

expectedRunWebhookDeliveries = append(expectedRunWebhookDeliveries, createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered))
expectedRunWebhookDeliveries = append(expectedRunWebhookDeliveries, createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered))
}

for i := 0; i < 5; i++ {
runWebhook := createRunWebhook(t, ctx, ns, project02)
expectedRunWebhooks = append(expectedRunWebhooks, runWebhook)

expectedRunWebhookDeliveries = append(expectedRunWebhookDeliveries, createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered))
expectedRunWebhookDeliveries = append(expectedRunWebhookDeliveries, createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered))
}

runWebhookCreationTime := time.Now().Add(-1 * time.Hour)
for i := 0; i < 50; i++ {
runWebhook := createRunWebhook(t, ctx, ns, project01)
createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered)
createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered)

updateRunWebhookCreationDate(t, ctx, ns, runWebhook.ID, runWebhookCreationTime)
}

err := ns.runWebhooksCleaner(ctx, 30*time.Minute)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

runWebhooks := getRunWebhooks(t, ctx, ns)
if len(runWebhooks) != len(expectedRunWebhooks) {
t.Fatalf("expected %d run webhooks got: %d", len(expectedRunWebhooks), len(runWebhooks))
}
if diff := cmpDiffObject(runWebhooks, expectedRunWebhooks); diff != "" {
t.Fatalf("mismatch (-want +got):\n%s", diff)
}

runWebhookDeliveries := getRunWebhookDeliveries(t, ctx, ns)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if len(runWebhookDeliveries) != len(expectedRunWebhookDeliveries) {
t.Fatalf("expected %d run webhooks got: %d", len(expectedRunWebhookDeliveries), len(runWebhookDeliveries))
}
if diff := cmpDiffObject(runWebhookDeliveries, expectedRunWebhookDeliveries); diff != "" {
t.Fatalf("mismatch (-want +got):\n%s", diff)
}
}

func cmpDiffObject(x, y interface{}) string {
// Since postgres has microsecond time precision while go has nanosecond time precision we should check times with a microsecond margin
return cmp.Diff(x, y, cmpopts.IgnoreFields(sqlg.ObjectMeta{}, "TxID"), cmpopts.EquateApproxTime(1*time.Microsecond))
Expand Down
82 changes: 82 additions & 0 deletions internal/services/notification/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ package notification

import (
"context"
"time"

"github.com/sorintlab/errors"

"agola.io/agola/internal/services/common"
"agola.io/agola/internal/services/gateway/action"
"agola.io/agola/internal/services/notification/types"
"agola.io/agola/internal/sqlg/lock"
"agola.io/agola/internal/sqlg/sql"
nstypes "agola.io/agola/services/notification/types"
rstypes "agola.io/agola/services/runservice/types"
)

Expand All @@ -31,6 +37,12 @@ const (
agolaDeliveryHeader = "X-Agola-Delivery"

webhookVersion = 1

webhooksCleanerLockKey = "webhookscleaner"

maxRunWebhooksQueryLimit = 40

runWebhooksCleanerInterval = 1 * 24 * time.Hour
)

type AgolaEventType string
Expand Down Expand Up @@ -115,3 +127,73 @@ func (n *NotificationService) generatewebhook(ctx context.Context, ev *rstypes.R

return webhook
}

func (n *NotificationService) runWebhooksCleanerLoop(ctx context.Context, runWebhookExpireInterval time.Duration) {
n.log.Debug().Msgf("webhookCleanerLoop")

for {
if err := n.runWebhooksCleaner(ctx, runWebhookExpireInterval); err != nil {
n.log.Warn().Err(err).Msgf("webhooksCleaner error")
}

sleepCh := time.NewTimer(runWebhooksCleanerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (n *NotificationService) runWebhooksCleaner(ctx context.Context, runWebhookExpireInterval time.Duration) error {
l := n.lf.NewLock(webhooksCleanerLockKey)
if err := l.TryLock(ctx); err != nil {
if errors.Is(err, lock.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = l.Unlock() }()

for {
var runWebhooks []*nstypes.RunWebhook
var afterRunWebhookID string

err := n.d.Do(ctx, func(tx *sql.Tx) error {
var err error
runWebhooks, err = n.d.GetRunWebhooksAfterRunWebhookID(tx, afterRunWebhookID, maxRunWebhooksQueryLimit)
if err != nil {
return errors.WithStack(err)
}

for _, r := range runWebhooks {
if time.Since(r.CreationTime) < runWebhookExpireInterval {
continue
}

err = n.d.DeleteRunWebhookDeliveriesByRunWebhookID(tx, r.ID)
if err != nil {
return errors.WithStack(err)
}

err = n.d.DeleteRunWebhook(tx, r.ID)
if err != nil {
return errors.WithStack(err)
}
}

return nil
})
if err != nil {
return errors.WithStack(err)
}

if len(runWebhooks) < maxRunWebhooksQueryLimit {
break
}

afterRunWebhookID = runWebhooks[len(runWebhooks)-1].ID
}

return nil
}
1 change: 1 addition & 0 deletions tests/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func setup(ctx context.Context, t *testing.T, dir string, opts ...setupOption) *
ListenAddress: ":4004",
TLS: false,
},
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: config.Runservice{
Debug: false,
Expand Down

0 comments on commit 3af7f7a

Please sign in to comment.