Skip to content

Commit

Permalink
notification: add ability to delete older run webhooks automatically
Browse files Browse the repository at this point in the history
this path adds an option to permit the automatic deleting of the older run webhooks.
  • Loading branch information
alessandro-sorint committed Nov 21, 2023
1 parent bfc34f0 commit 152b689
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 28 deletions.
5 changes: 5 additions & 0 deletions internal/services/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type Notification struct {

WebhookURL string `yaml:"webhookURL"`
WebhookSecret string `yaml:"webhookSecret"`

RunWebhookExpireInterval time.Duration `yaml:"runWebhookExpireInterval"`
}

type Runservice struct {
Expand Down Expand Up @@ -358,6 +360,9 @@ var defaultConfig = func() *Config {
RepositoryCleanupInterval: 24 * time.Hour,
RepositoryRefsExpireInterval: 30 * 24 * time.Hour,
},
Notification: Notification{
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
}
}

Expand Down
62 changes: 34 additions & 28 deletions internal/services/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ gitserver:
},
Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"},
Notification: Notification{
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down Expand Up @@ -223,9 +224,10 @@ gitserver:
},
Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"},
Notification: Notification{
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down Expand Up @@ -309,9 +311,10 @@ gitserver:
},
Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"},
Notification: Notification{
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
RunCacheExpireInterval: 7 * 24 * time.Hour,
Expand Down Expand Up @@ -448,9 +451,10 @@ gitserver:
},
Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"},
Notification: Notification{
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
ConfigstoreURL: "http://localhost:4002",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down Expand Up @@ -593,14 +597,15 @@ gitserver:
RunserviceAPIToken: "internalservicesapitoken",
},
Notification: Notification{
DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"},
Web: Web{ListenAddress: ":4004"},
APIToken: "internalservicesapitoken",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
RunserviceAPIToken: "internalservicesapitoken",
ConfigstoreURL: "http://localhost:4002",
ConfigstoreAPIToken: "internalservicesapitoken",
DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"},
Web: Web{ListenAddress: ":4004"},
APIToken: "internalservicesapitoken",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
RunserviceAPIToken: "internalservicesapitoken",
ConfigstoreURL: "http://localhost:4002",
ConfigstoreAPIToken: "internalservicesapitoken",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down Expand Up @@ -754,14 +759,15 @@ gitserver:
RunserviceAPIToken: "runserviceapitoken",
},
Notification: Notification{
DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"},
Web: Web{ListenAddress: ":4004"},
APIToken: "notificationapitoken",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
RunserviceAPIToken: "runserviceapitoken",
ConfigstoreURL: "http://localhost:4002",
ConfigstoreAPIToken: "configstoreapitoken",
DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"},
Web: Web{ListenAddress: ":4004"},
APIToken: "notificationapitoken",
WebExposedURL: "http://localhost:8000",
RunserviceURL: "http://localhost:4000",
RunserviceAPIToken: "runserviceapitoken",
ConfigstoreURL: "http://localhost:4002",
ConfigstoreAPIToken: "configstoreapitoken",
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: Runservice{
DataDir: "/data/agola/runservice",
Expand Down
12 changes: 12 additions & 0 deletions internal/services/notification/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,15 @@ func (d *DB) GetCommitStatuses(tx *sql.Tx, limit int) ([]*types.CommitStatus, er

return commitStatuses, errors.WithStack(err)
}

func (d *DB) DeleteRunWebhookDeliveriesByRunWebhookID(tx *sql.Tx, runWebhookID string) error {
q := sq.NewDeleteBuilder()
q.DeleteFrom("runwebhookdelivery")
q.Where(q.E("run_webhook_id", runWebhookID))

if _, err := d.exec(tx, q); err != nil {
return errors.Wrap(err, "failed to delete runWebhookdeliveries")
}

return nil
}
1 change: 1 addition & 0 deletions internal/services/notification/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (n *NotificationService) run(ctx context.Context) error {
util.GoWait(&wg, func() { n.runEventsHandlerLoop(ctx) })
util.GoWait(&wg, func() { n.RunWebhookDeliveriesHandlerLoop(ctx) })
util.GoWait(&wg, func() { n.CommitStatusDeliveriesHandlerLoop(ctx) })
util.GoWait(&wg, func() { n.runWebhooksCleanerLoop(ctx, n.c.RunWebhookExpireInterval) })

mainrouter := n.setupDefaultRouter()
httpServer := http.Server{
Expand Down
89 changes: 89 additions & 0 deletions internal/services/notification/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,27 @@ func createRunWebhookDelivery(t *testing.T, ctx context.Context, ns *Notificatio
return wd
}

func updateRunWebhookCreationDate(t *testing.T, ctx context.Context, ns *NotificationService, runWebhookID string, creationTime time.Time) {
err := ns.d.Do(ctx, func(tx *sql.Tx) error {
var err error
runWebhook, err := ns.d.GetRunWebhookByID(tx, runWebhookID)
if err != nil {
return errors.WithStack(err)
}

runWebhook.CreationTime = creationTime
err = ns.d.UpdateRunWebhook(tx, runWebhook)
if err != nil {
return errors.WithStack(err)
}

return nil
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}

func TestCommitStatusDelivery(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -709,6 +730,74 @@ func TestDeliveryStatusFromStringSlice(t *testing.T) {
}
}

func TestRunWebhooksCleaner(t *testing.T) {
t.Parallel()

dir := t.TempDir()
log := testutil.NewLogger(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ns := setupNotificationService(ctx, t, log, dir)

t.Logf("starting ns")

time.Sleep(1 * time.Second)

expectedRunWebhooks := make([]*types.RunWebhook, 0)

for i := 0; i < 5; i++ {
runWebhook := createRunWebhook(t, ctx, ns, project01)
expectedRunWebhooks = append(expectedRunWebhooks, runWebhook)

createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered)
createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered)
}

for i := 0; i < 5; i++ {
runWebhook := createRunWebhook(t, ctx, ns, project02)
expectedRunWebhooks = append(expectedRunWebhooks, runWebhook)

createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered)
createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered)
}

runWebhookCreationTime := time.Now().Add(-1 * time.Hour)
for i := 0; i < 5; i++ {
runWebhook := createRunWebhook(t, ctx, ns, project01)
createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered)
createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered)

updateRunWebhookCreationDate(t, ctx, ns, runWebhook.ID, runWebhookCreationTime)
}

err := ns.runWebhooksCleaner(ctx, 30*time.Minute)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

var runWebhooks []*types.RunWebhook
err = ns.d.Do(ctx, func(tx *sql.Tx) error {
var err error
runWebhooks, err = ns.d.GetRunWebhooks(tx, 0)
if err != nil {
return errors.WithStack(err)
}

return nil
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if len(runWebhooks) != len(expectedRunWebhooks) {
t.Fatalf("expected %d run webhooks got: %d", len(expectedRunWebhooks), len(runWebhooks))
}
if diff := cmpDiffObject(runWebhooks, expectedRunWebhooks); diff != "" {
t.Fatalf("mismatch (-want +got):\n%s", diff)
}
}

func cmpDiffObject(x, y interface{}) string {
// Since postgres has microsecond time precision while go has nanosecond time precision we should check times with a microsecond margin
return cmp.Diff(x, y, cmpopts.IgnoreFields(sqlg.ObjectMeta{}, "TxID"), cmpopts.EquateApproxTime(1*time.Microsecond))
Expand Down
77 changes: 77 additions & 0 deletions internal/services/notification/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ package notification

import (
"context"
"time"

"github.com/sorintlab/errors"

"agola.io/agola/internal/services/common"
"agola.io/agola/internal/services/gateway/action"
"agola.io/agola/internal/services/notification/types"
"agola.io/agola/internal/sqlg/lock"
"agola.io/agola/internal/sqlg/sql"
nstypes "agola.io/agola/services/notification/types"
rstypes "agola.io/agola/services/runservice/types"
)

Expand All @@ -31,6 +37,10 @@ const (
agolaDeliveryHeader = "X-Agola-Delivery"

webhookVersion = 1

webhooksCleanerLockKey = "webhookscleaner"

maxRunWebhooksQueryLimit = 40
)

type AgolaEventType string
Expand Down Expand Up @@ -115,3 +125,70 @@ func (n *NotificationService) generatewebhook(ctx context.Context, ev *rstypes.R

return webhook
}

func (n *NotificationService) runWebhooksCleanerLoop(ctx context.Context, runWebhookExpireInterval time.Duration) {
n.log.Debug().Msgf("webhookCleanerLoop")

for {
if err := n.runWebhooksCleaner(ctx, runWebhookExpireInterval); err != nil {
n.log.Warn().Err(err).Msgf("webhooksCleaner error")
}

sleepCh := time.NewTimer(runWebhookExpireInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (n *NotificationService) runWebhooksCleaner(ctx context.Context, runWebhookExpireInterval time.Duration) error {
l := n.lf.NewLock(webhooksCleanerLockKey)
if err := l.TryLock(ctx); err != nil {
if errors.Is(err, lock.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = l.Unlock() }()

for {
var runWebhooks []*nstypes.RunWebhook

err := n.d.Do(ctx, func(tx *sql.Tx) error {
var err error
runWebhooks, err = n.d.GetRunWebhooks(tx, maxRunWebhooksQueryLimit)
if err != nil {
return errors.WithStack(err)
}

for _, r := range runWebhooks {
if r.CreationTime.Add(runWebhookExpireInterval).After(time.Now()) {
continue
}

err = n.d.DeleteRunWebhookDeliveriesByRunWebhookID(tx, r.ID)
if err != nil {
return errors.WithStack(err)
}

err = n.d.DeleteRunWebhook(tx, r.ID)
if err != nil {
return errors.WithStack(err)
}
}

return nil
})
if err != nil {
return errors.WithStack(err)
}

if len(runWebhooks) < maxRunWebhooksQueryLimit {
break
}
}

return nil
}
1 change: 1 addition & 0 deletions tests/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func setup(ctx context.Context, t *testing.T, dir string, opts ...setupOption) *
ListenAddress: ":4004",
TLS: false,
},
RunWebhookExpireInterval: 7 * 24 * time.Hour,
},
Runservice: config.Runservice{
Debug: false,
Expand Down

0 comments on commit 152b689

Please sign in to comment.