diff --git a/internal/services/config/config.go b/internal/services/config/config.go index f18360853..b5db7c13b 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -134,6 +134,8 @@ type Notification struct { WebhookURL string `yaml:"webhookURL"` WebhookSecret string `yaml:"webhookSecret"` + + RunWebhookExpireInterval time.Duration `yaml:"runWebhookExpireInterval"` } type Runservice struct { @@ -358,6 +360,9 @@ var defaultConfig = func() *Config { RepositoryCleanupInterval: 24 * time.Hour, RepositoryRefsExpireInterval: 30 * 24 * time.Hour, }, + Notification: Notification{ + RunWebhookExpireInterval: 7 * 24 * time.Hour, + }, } } diff --git a/internal/services/config/config_test.go b/internal/services/config/config_test.go index ba3cc14d7..e8716dca0 100644 --- a/internal/services/config/config_test.go +++ b/internal/services/config/config_test.go @@ -115,9 +115,10 @@ gitserver: }, Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"}, Notification: Notification{ - WebExposedURL: "http://localhost:8000", - RunserviceURL: "http://localhost:4000", - ConfigstoreURL: "http://localhost:4002", + WebExposedURL: "http://localhost:8000", + RunserviceURL: "http://localhost:4000", + ConfigstoreURL: "http://localhost:4002", + RunWebhookExpireInterval: 7 * 24 * time.Hour, }, Runservice: Runservice{ DataDir: "/data/agola/runservice", @@ -223,9 +224,10 @@ gitserver: }, Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"}, Notification: Notification{ - WebExposedURL: "http://localhost:8000", - RunserviceURL: "http://localhost:4000", - ConfigstoreURL: "http://localhost:4002", + WebExposedURL: "http://localhost:8000", + RunserviceURL: "http://localhost:4000", + ConfigstoreURL: "http://localhost:4002", + RunWebhookExpireInterval: 7 * 24 * time.Hour, }, Runservice: Runservice{ DataDir: "/data/agola/runservice", @@ -309,9 +311,10 @@ gitserver: }, Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"}, Notification: Notification{ - WebExposedURL: "http://localhost:8000", - RunserviceURL: "http://localhost:4000", - ConfigstoreURL: "http://localhost:4002", + WebExposedURL: "http://localhost:8000", + RunserviceURL: "http://localhost:4000", + ConfigstoreURL: "http://localhost:4002", + RunWebhookExpireInterval: 7 * 24 * time.Hour, }, Runservice: Runservice{ RunCacheExpireInterval: 7 * 24 * time.Hour, @@ -448,9 +451,10 @@ gitserver: }, Scheduler: Scheduler{RunserviceURL: "http://localhost:4000"}, Notification: Notification{ - WebExposedURL: "http://localhost:8000", - RunserviceURL: "http://localhost:4000", - ConfigstoreURL: "http://localhost:4002", + WebExposedURL: "http://localhost:8000", + RunserviceURL: "http://localhost:4000", + ConfigstoreURL: "http://localhost:4002", + RunWebhookExpireInterval: 7 * 24 * time.Hour, }, Runservice: Runservice{ DataDir: "/data/agola/runservice", @@ -593,14 +597,15 @@ gitserver: RunserviceAPIToken: "internalservicesapitoken", }, Notification: Notification{ - DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"}, - Web: Web{ListenAddress: ":4004"}, - APIToken: "internalservicesapitoken", - WebExposedURL: "http://localhost:8000", - RunserviceURL: "http://localhost:4000", - RunserviceAPIToken: "internalservicesapitoken", - ConfigstoreURL: "http://localhost:4002", - ConfigstoreAPIToken: "internalservicesapitoken", + DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"}, + Web: Web{ListenAddress: ":4004"}, + APIToken: "internalservicesapitoken", + WebExposedURL: "http://localhost:8000", + RunserviceURL: "http://localhost:4000", + RunserviceAPIToken: "internalservicesapitoken", + ConfigstoreURL: "http://localhost:4002", + ConfigstoreAPIToken: "internalservicesapitoken", + RunWebhookExpireInterval: 7 * 24 * time.Hour, }, Runservice: Runservice{ DataDir: "/data/agola/runservice", @@ -754,14 +759,15 @@ gitserver: RunserviceAPIToken: "runserviceapitoken", }, Notification: Notification{ - DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"}, - Web: Web{ListenAddress: ":4004"}, - APIToken: "notificationapitoken", - WebExposedURL: "http://localhost:8000", - RunserviceURL: "http://localhost:4000", - RunserviceAPIToken: "runserviceapitoken", - ConfigstoreURL: "http://localhost:4002", - ConfigstoreAPIToken: "configstoreapitoken", + DB: DB{Type: "sqlite3", ConnString: "/data/agola/notification/db"}, + Web: Web{ListenAddress: ":4004"}, + APIToken: "notificationapitoken", + WebExposedURL: "http://localhost:8000", + RunserviceURL: "http://localhost:4000", + RunserviceAPIToken: "runserviceapitoken", + ConfigstoreURL: "http://localhost:4002", + ConfigstoreAPIToken: "configstoreapitoken", + RunWebhookExpireInterval: 7 * 24 * time.Hour, }, Runservice: Runservice{ DataDir: "/data/agola/runservice", diff --git a/internal/services/notification/db/db.go b/internal/services/notification/db/db.go index da0d51d95..467af0e56 100644 --- a/internal/services/notification/db/db.go +++ b/internal/services/notification/db/db.go @@ -186,6 +186,23 @@ func (d *DB) GetRunWebhooks(tx *sql.Tx, limit int) ([]*types.RunWebhook, error) return runWebhooks, errors.WithStack(err) } +func (d *DB) GetRunWebhooksAfterRunWebhookID(tx *sql.Tx, afterRunWebhookID string, limit int) ([]*types.RunWebhook, error) { + q := runWebhookSelect().OrderBy("id") + if afterRunWebhookID != "" { + q.Where(q.G("id", afterRunWebhookID)) + } + + if limit > 0 { + q.Limit(limit) + } + runWebhooks, _, err := d.fetchRunWebhooks(tx, q) + if err != nil { + return nil, errors.WithStack(err) + } + + return runWebhooks, errors.WithStack(err) +} + func (d *DB) GetLastRunEventSequence(tx *sql.Tx) (*types.LastRunEventSequence, error) { q := lastRunEventSequenceSelect() lastRunEventSequences, _, err := d.fetchLastRunEventSequences(tx, q) @@ -248,3 +265,15 @@ func (d *DB) GetCommitStatuses(tx *sql.Tx, limit int) ([]*types.CommitStatus, er return commitStatuses, errors.WithStack(err) } + +func (d *DB) DeleteRunWebhookDeliveriesByRunWebhookID(tx *sql.Tx, runWebhookID string) error { + q := sq.NewDeleteBuilder() + q.DeleteFrom("runwebhookdelivery") + q.Where(q.E("run_webhook_id", runWebhookID)) + + if _, err := d.exec(tx, q); err != nil { + return errors.Wrap(err, "failed to delete runWebhookdeliveries") + } + + return nil +} diff --git a/internal/services/notification/notification.go b/internal/services/notification/notification.go index c8611ceb4..f6d96f2dc 100644 --- a/internal/services/notification/notification.go +++ b/internal/services/notification/notification.go @@ -179,6 +179,7 @@ func (n *NotificationService) run(ctx context.Context) error { util.GoWait(&wg, func() { n.runEventsHandlerLoop(ctx) }) util.GoWait(&wg, func() { n.RunWebhookDeliveriesHandlerLoop(ctx) }) util.GoWait(&wg, func() { n.CommitStatusDeliveriesHandlerLoop(ctx) }) + util.GoWait(&wg, func() { n.runWebhooksCleanerLoop(ctx, n.c.RunWebhookExpireInterval) }) mainrouter := n.setupDefaultRouter() httpServer := http.Server{ diff --git a/internal/services/notification/notification_test.go b/internal/services/notification/notification_test.go index 793af9a07..63cc227f9 100644 --- a/internal/services/notification/notification_test.go +++ b/internal/services/notification/notification_test.go @@ -377,6 +377,26 @@ func createRunWebhookDelivery(t *testing.T, ctx context.Context, ns *Notificatio return wd } +func updateRunWebhookCreationDate(t *testing.T, ctx context.Context, ns *NotificationService, runWebhookID string, creationTime time.Time) { + err := ns.d.Do(ctx, func(tx *sql.Tx) error { + var err error + runWebhook, err := ns.d.GetRunWebhookByID(tx, runWebhookID) + if err != nil { + return errors.WithStack(err) + } + + runWebhook.CreationTime = creationTime + if err := ns.d.UpdateRunWebhook(tx, runWebhook); err != nil { + return errors.WithStack(err) + } + + return nil + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } +} + func TestCommitStatusDelivery(t *testing.T) { t.Parallel() @@ -711,6 +731,74 @@ func TestDeliveryStatusFromStringSlice(t *testing.T) { } } +func TestRunWebhooksCleaner(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + log := testutil.NewLogger(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ns := setupNotificationService(ctx, t, log, dir) + + t.Logf("starting ns") + + time.Sleep(1 * time.Second) + + expectedRunWebhooks := make([]*types.RunWebhook, 0) + expectedRunWebhookDeliveries := make([]*types.RunWebhookDelivery, 0) + + for i := 0; i < 5; i++ { + runWebhook := createRunWebhook(t, ctx, ns, project01) + expectedRunWebhooks = append(expectedRunWebhooks, runWebhook) + + expectedRunWebhookDeliveries = append(expectedRunWebhookDeliveries, createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered)) + expectedRunWebhookDeliveries = append(expectedRunWebhookDeliveries, createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered)) + } + + for i := 0; i < 5; i++ { + runWebhook := createRunWebhook(t, ctx, ns, project02) + expectedRunWebhooks = append(expectedRunWebhooks, runWebhook) + + expectedRunWebhookDeliveries = append(expectedRunWebhookDeliveries, createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered)) + expectedRunWebhookDeliveries = append(expectedRunWebhookDeliveries, createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered)) + } + + runWebhookCreationTime := time.Now().Add(-1 * time.Hour) + for i := 0; i < 50; i++ { + runWebhook := createRunWebhook(t, ctx, ns, project01) + createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusDelivered) + createRunWebhookDelivery(t, ctx, ns, runWebhook.ID, types.DeliveryStatusNotDelivered) + + updateRunWebhookCreationDate(t, ctx, ns, runWebhook.ID, runWebhookCreationTime) + } + + err := ns.runWebhooksCleaner(ctx, 30*time.Minute) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + runWebhooks := getRunWebhooks(t, ctx, ns) + if len(runWebhooks) != len(expectedRunWebhooks) { + t.Fatalf("expected %d run webhooks got: %d", len(expectedRunWebhooks), len(runWebhooks)) + } + if diff := cmpDiffObject(runWebhooks, expectedRunWebhooks); diff != "" { + t.Fatalf("mismatch (-want +got):\n%s", diff) + } + + runWebhookDeliveries := getRunWebhookDeliveries(t, ctx, ns) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if len(runWebhookDeliveries) != len(expectedRunWebhookDeliveries) { + t.Fatalf("expected %d run webhooks got: %d", len(expectedRunWebhookDeliveries), len(runWebhookDeliveries)) + } + if diff := cmpDiffObject(runWebhookDeliveries, expectedRunWebhookDeliveries); diff != "" { + t.Fatalf("mismatch (-want +got):\n%s", diff) + } +} + func cmpDiffObject(x, y interface{}) string { // Since postgres has microsecond time precision while go has nanosecond time precision we should check times with a microsecond margin return cmp.Diff(x, y, cmpopts.IgnoreFields(sqlg.ObjectMeta{}, "TxID"), cmpopts.EquateApproxTime(1*time.Microsecond)) diff --git a/internal/services/notification/webhooks.go b/internal/services/notification/webhooks.go index 29de58304..f3e562c7d 100644 --- a/internal/services/notification/webhooks.go +++ b/internal/services/notification/webhooks.go @@ -16,10 +16,16 @@ package notification import ( "context" + "time" + + "github.com/sorintlab/errors" "agola.io/agola/internal/services/common" "agola.io/agola/internal/services/gateway/action" "agola.io/agola/internal/services/notification/types" + "agola.io/agola/internal/sqlg/lock" + "agola.io/agola/internal/sqlg/sql" + nstypes "agola.io/agola/services/notification/types" rstypes "agola.io/agola/services/runservice/types" ) @@ -31,6 +37,10 @@ const ( agolaDeliveryHeader = "X-Agola-Delivery" webhookVersion = 1 + + webhooksCleanerLockKey = "webhookscleaner" + + maxRunWebhooksQueryLimit = 40 ) type AgolaEventType string @@ -115,3 +125,73 @@ func (n *NotificationService) generatewebhook(ctx context.Context, ev *rstypes.R return webhook } + +func (n *NotificationService) runWebhooksCleanerLoop(ctx context.Context, runWebhookExpireInterval time.Duration) { + n.log.Debug().Msgf("webhookCleanerLoop") + + for { + if err := n.runWebhooksCleaner(ctx, runWebhookExpireInterval); err != nil { + n.log.Warn().Err(err).Msgf("webhooksCleaner error") + } + + sleepCh := time.NewTimer(runWebhookExpireInterval).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (n *NotificationService) runWebhooksCleaner(ctx context.Context, runWebhookExpireInterval time.Duration) error { + l := n.lf.NewLock(webhooksCleanerLockKey) + if err := l.TryLock(ctx); err != nil { + if errors.Is(err, lock.ErrLocked) { + return nil + } + return errors.WithStack(err) + } + defer func() { _ = l.Unlock() }() + + for { + var runWebhooks []*nstypes.RunWebhook + var afterRunWebhookID string + + err := n.d.Do(ctx, func(tx *sql.Tx) error { + var err error + runWebhooks, err = n.d.GetRunWebhooksAfterRunWebhookID(tx, afterRunWebhookID, maxRunWebhooksQueryLimit) + if err != nil { + return errors.WithStack(err) + } + + for _, r := range runWebhooks { + if time.Since(r.CreationTime) < runWebhookExpireInterval { + continue + } + + err = n.d.DeleteRunWebhookDeliveriesByRunWebhookID(tx, r.ID) + if err != nil { + return errors.WithStack(err) + } + + err = n.d.DeleteRunWebhook(tx, r.ID) + if err != nil { + return errors.WithStack(err) + } + } + + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + if len(runWebhooks) < maxRunWebhooksQueryLimit { + break + } + + afterRunWebhookID = runWebhooks[len(runWebhooks)-1].ID + } + + return nil +} diff --git a/tests/setup_test.go b/tests/setup_test.go index da86d3826..ca6b5dfea 100644 --- a/tests/setup_test.go +++ b/tests/setup_test.go @@ -351,6 +351,7 @@ func setup(ctx context.Context, t *testing.T, dir string, opts ...setupOption) * ListenAddress: ":4004", TLS: false, }, + RunWebhookExpireInterval: 7 * 24 * time.Hour, }, Runservice: config.Runservice{ Debug: false,