diff --git a/go.mod b/go.mod index 3a56a1c7d..75e0f56ed 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/xanzy/go-gitlab v0.26.0 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 go.starlark.net v0.0.0-20200203144150-6677ee5c7211 + go.uber.org/zap v1.10.0 golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d gopkg.in/src-d/go-billy.v4 v4.3.2 diff --git a/internal/services/config/config.go b/internal/services/config/config.go index 108ae4d8f..073f6b86a 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -95,6 +95,7 @@ type Runservice struct { RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"` + RunLogExpireInterval time.Duration `yaml:"runLogExpireInterval"` } type Executor struct { @@ -257,6 +258,7 @@ var defaultConfig = Config{ Runservice: Runservice{ RunCacheExpireInterval: 7 * 24 * time.Hour, RunWorkspaceExpireInterval: 7 * 24 * time.Hour, + RunLogExpireInterval: 30 * 24 * time.Hour, }, Executor: Executor{ InitImage: InitImage{ diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index 1111a966f..9b4769044 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -49,6 +49,7 @@ var ( EtcdCompactChangeGroupsLockKey = path.Join(EtcdLocksDir, "compactchangegroups") EtcdCacheCleanerLockKey = path.Join(EtcdLocksDir, "cachecleaner") EtcdWorkspaceCleanerLockKey = path.Join(EtcdLocksDir, "workspacecleaner") + EtcdLogCleanerLockKey = path.Join(EtcdLocksDir, "logcleaner") EtcdTaskUpdaterLockKey = path.Join(EtcdLocksDir, "taskupdater") EtcdMaintenanceKey = "maintenance" diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index 09503f021..2a3545e27 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -384,6 +384,7 @@ func (s *Runservice) run(ctx context.Context) error { util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) }) util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) }) util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) }) + util.GoWait(&wg, func() { s.logCleanerLoop(ctx, s.c.RunLogExpireInterval) }) util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) }) util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) }) } diff --git a/internal/services/runservice/runservice_test.go b/internal/services/runservice/runservice_test.go new file mode 100644 index 000000000..6cbf75e43 --- /dev/null +++ b/internal/services/runservice/runservice_test.go @@ -0,0 +1,104 @@ +package runservice + +import ( + "bytes" + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "agola.io/agola/internal/objectstorage" + "agola.io/agola/internal/services/config" + "agola.io/agola/internal/services/runservice/common" + "agola.io/agola/internal/services/runservice/store" + "agola.io/agola/internal/testutil" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +func setupEtcd(t *testing.T, log zerolog.Logger, dir string) *testutil.TestEmbeddedEtcd { + tetcd, err := testutil.NewTestEmbeddedEtcd(t, log, dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tetcd.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tetcd.WaitUp(30 * time.Second); err != nil { + t.Fatalf("error waiting on etcd up: %v", err) + } + return tetcd +} + +func shutdownEtcd(tetcd *testutil.TestEmbeddedEtcd) { + if tetcd.Etcd != nil { + _ = tetcd.Kill() + } +} + +func setupRunservice(t *testing.T, ctx context.Context, dir string) (*Runservice, *testutil.TestEmbeddedEtcd, error) { + logger := log.Level(zerolog.DebugLevel) + + etcdDir := filepath.Join(dir, "etcd") + tetcd := setupEtcd(t, logger, etcdDir) + + config := config.Runservice{ + DataDir: filepath.Join(dir, "runservice"), + Etcd: config.Etcd{ + Endpoints: "", + }, + ObjectStorage: config.ObjectStorage{ + Type: "posix", + Path: filepath.Join(dir, "runservice/ost"), + }, + RunLogExpireInterval: time.Millisecond, + } + + config.Etcd.Endpoints = tetcd.Endpoint + + rs, err := NewRunservice(ctx, logger, &config) + return rs, tetcd, err +} + +func TestLogleaner(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rs, tetcd, err := setupRunservice(t, ctx, dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + defer shutdownEtcd(tetcd) + + body := ioutil.NopCloser(bytes.NewBufferString("log test")) + logPath := store.OSTRunTaskStepLogPath("tast01", 0) + + err = rs.ost.WriteObject(logPath, body, -1, false) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + _, err = rs.ost.ReadObject(logPath) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + err = rs.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.EtcdLogCleanerLockKey, 1*time.Millisecond) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + _, err = rs.ost.ReadObject(logPath) + if err == nil || !objectstorage.IsNotExist(err) { + t.Fatalf("expected err NotExists, got: %v", err) + } +} diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 9ad04cc3d..ad30878c2 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -41,6 +41,7 @@ import ( const ( cacheCleanerInterval = 1 * 24 * time.Hour workspaceCleanerInterval = 1 * 24 * time.Hour + logCleanerInterval = 1 * 24 * time.Hour defaultExecutorNotAliveInterval = 60 * time.Second ) @@ -1413,7 +1414,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) { for { - if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil { + if err := s.objectsCleaner(ctx, store.OSTArchivesBaseDir(), common.EtcdWorkspaceCleanerLockKey, workspaceExpireInterval); err != nil { s.log.Err(err).Send() } @@ -1426,8 +1427,25 @@ func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireIn } } -func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error { - s.log.Debug().Msgf("workspaceCleaner") +func (s *Runservice) logCleanerLoop(ctx context.Context, logExpireInterval time.Duration) { + s.log.Debug().Msgf("logCleanerLoop") + + for { + if err := s.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.EtcdLogCleanerLockKey, logExpireInterval); err != nil { + s.log.Warn().Msgf("objectsCleaner error: %v", err) + } + + sleepCh := time.NewTimer(logCleanerInterval).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (s *Runservice) objectsCleaner(ctx context.Context, prefix string, etcdLockKey string, objectExpireInterval time.Duration) error { + s.log.Debug().Msgf("objectsCleaner") session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { @@ -1435,7 +1453,7 @@ func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterv } defer session.Close() - m := etcd.NewMutex(session, common.EtcdWorkspaceCleanerLockKey) + m := etcd.NewMutex(session, etcdLockKey) if err := m.TryLock(ctx); err != nil { if errors.Is(err, etcd.ErrLocked) { @@ -1447,14 +1465,14 @@ func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterv doneCh := make(chan struct{}) defer close(doneCh) - for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) { + for object := range s.ost.List(prefix+"/", "", true, doneCh) { if object.Err != nil { return errors.WithStack(object.Err) } - if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) { + if object.LastModified.Add(objectExpireInterval).Before(time.Now()) { if err := s.ost.DeleteObject(object.Path); err != nil { if !objectstorage.IsNotExist(err) { - s.log.Warn().Msgf("failed to delete workspace object %q: %v", object.Path, err) + s.log.Warn().Msgf("failed to delete object %q: %v", object.Path, err) } } } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 8f28c97b1..5b34f809e 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -59,8 +59,12 @@ func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*da return action, nil } +func OSTLogsBaseDir() string { + return "logs" +} + func OSTRunTaskLogsBaseDir(rtID string) string { - return path.Join("logs", rtID) + return path.Join(OSTLogsBaseDir(), rtID) } func OSTRunTaskLogsDataDir(rtID string) string {