diff --git a/internal/services/config/config.go b/internal/services/config/config.go index 841968d34..18a8a1e55 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -98,6 +98,7 @@ type Runservice struct { RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"` + RunLogExpireInterval time.Duration `yaml:"runLogExpireInterval"` } type Executor struct { @@ -255,6 +256,7 @@ var defaultConfig = Config{ Runservice: Runservice{ RunCacheExpireInterval: 7 * 24 * time.Hour, RunWorkspaceExpireInterval: 7 * 24 * time.Hour, + RunLogExpireInterval: 30 * 24 * time.Hour, }, Executor: Executor{ InitImage: InitImage{ diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index 2bfc24df8..1742b46ad 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -37,6 +37,7 @@ const ( CacheCleanerLockKey = "cachecleaner" WorkspaceCleanerLockKey = "workspacecleaner" + LogCleanerLockKey = "logcleaner" TaskUpdaterLockKey = "taskupdater" ) diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index 9e656e6f2..02f9e6e04 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -286,6 +286,7 @@ func (s *Runservice) run(ctx context.Context) error { util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) }) util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) }) util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) }) + util.GoWait(&wg, func() { s.logCleanerLoop(ctx, s.c.RunLogExpireInterval) }) util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) }) } diff --git a/internal/services/runservice/runservice_test.go b/internal/services/runservice/runservice_test.go index 5fcb9c97b..759d42bc7 100644 --- a/internal/services/runservice/runservice_test.go +++ b/internal/services/runservice/runservice_test.go @@ -26,8 +26,11 @@ import ( "time" "agola.io/agola/internal/errors" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/services/config" "agola.io/agola/internal/services/runservice/action" + "agola.io/agola/internal/services/runservice/common" + "agola.io/agola/internal/services/runservice/store" "agola.io/agola/internal/sql" "agola.io/agola/internal/testutil" "agola.io/agola/internal/util" @@ -280,3 +283,37 @@ func TestGetRunsLastRun(t *testing.T) { } } } + +func TestLogleaner(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + log := testutil.NewLogger(t) + + rs := setupRunservice(ctx, t, log, dir) + rs.c.RunCacheExpireInterval = 604800000000000 + + body := ioutil.NopCloser(bytes.NewBufferString("log test")) + logPath := store.OSTRunTaskStepLogPath("task01", 0) + + err := rs.ost.WriteObject(logPath, body, -1, false) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + _, err = rs.ost.ReadObject(logPath) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + time.Sleep(1 * time.Second) + + err = rs.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.LogCleanerLockKey, 1*time.Millisecond) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + _, err = rs.ost.ReadObject(logPath) + if err == nil || !objectstorage.IsNotExist(err) { + t.Fatalf("expected err NotExists, got: %v", err) + } +} diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 65ace5b20..20fe79187 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -39,6 +39,7 @@ const ( changeGroupCompactorInterval = 1 * time.Minute cacheCleanerInterval = 1 * 24 * time.Hour workspaceCleanerInterval = 1 * 24 * time.Hour + logCleanerInterval = 1 * 24 * time.Hour defaultExecutorNotAliveInterval = 60 * time.Second @@ -1589,7 +1590,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) { for { - if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil { + if err := s.objectsCleaner(ctx, store.OSTArchivesBaseDir(), common.WorkspaceCleanerLockKey, workspaceExpireInterval); err != nil { s.log.Err(err).Send() } @@ -1602,25 +1603,42 @@ func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireIn } } -func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error { - s.log.Debug().Msgf("workspaceCleaner") +func (s *Runservice) logCleanerLoop(ctx context.Context, logExpireInterval time.Duration) { + s.log.Debug().Msgf("logCleanerLoop") + + for { + if err := s.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.LogCleanerLockKey, logExpireInterval); err != nil { + s.log.Warn().Msgf("objectsCleaner error: %v", err) + } + + sleepCh := time.NewTimer(logCleanerInterval).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (s *Runservice) objectsCleaner(ctx context.Context, prefix string, etcdLockKey string, objectExpireInterval time.Duration) error { + s.log.Debug().Msgf("objectsCleaner") l := s.lf.NewLock(common.WorkspaceCleanerLockKey) if err := l.Lock(ctx); err != nil { - return errors.Wrap(err, "failed to acquire workspace cleaner lock") + return errors.Wrap(err, "failed to acquire object cleaner lock") } defer func() { _ = l.Unlock() }() doneCh := make(chan struct{}) defer close(doneCh) - for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) { + for object := range s.ost.List(prefix+"/", "", true, doneCh) { if object.Err != nil { return object.Err } - if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) { + if object.LastModified.Add(objectExpireInterval).Before(time.Now()) { if err := s.ost.DeleteObject(object.Path); err != nil { if !objectstorage.IsNotExist(err) { - s.log.Warn().Msgf("failed to delete workspace object %q: %v", object.Path, err) + s.log.Warn().Msgf("failed to delete object %q: %v", object.Path, err) } } } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index e9db41950..da6df12c0 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -24,7 +24,11 @@ import ( ) func OSTRunTaskLogsBaseDir(rtID string) string { - return path.Join("logs", rtID) + return path.Join(OSTLogsBaseDir(), rtID) +} + +func OSTLogsBaseDir() string { + return "logs" } func OSTRunTaskLogsDataDir(rtID string) string {