Skip to content

Commit

Permalink
Runservice: added logCleanerLoop for deleting older logs
Browse files Browse the repository at this point in the history
  • Loading branch information
alessandro-sorint authored and sgotti committed Feb 15, 2022
1 parent d0d219c commit 7d4313e
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 8 deletions.
2 changes: 2 additions & 0 deletions internal/services/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Runservice struct {

RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"`
RunLogExpireInterval time.Duration `yaml:"runLogExpireInterval"`
}

type Executor struct {
Expand Down Expand Up @@ -257,6 +258,7 @@ var defaultConfig = Config{
Runservice: Runservice{
RunCacheExpireInterval: 7 * 24 * time.Hour,
RunWorkspaceExpireInterval: 7 * 24 * time.Hour,
RunLogExpireInterval: 30 * 24 * time.Hour,
},
Executor: Executor{
InitImage: InitImage{
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
EtcdCompactChangeGroupsLockKey = path.Join(EtcdLocksDir, "compactchangegroups")
EtcdCacheCleanerLockKey = path.Join(EtcdLocksDir, "cachecleaner")
EtcdWorkspaceCleanerLockKey = path.Join(EtcdLocksDir, "workspacecleaner")
EtcdLogCleanerLockKey = path.Join(EtcdLocksDir, "logcleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdLocksDir, "taskupdater")

EtcdMaintenanceKey = "maintenance"
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/runservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func (s *Runservice) run(ctx context.Context) error {
util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) })
util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) })
util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) })
util.GoWait(&wg, func() { s.logCleanerLoop(ctx, s.c.RunLogExpireInterval) })
util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) })
util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) })
}
Expand Down
104 changes: 104 additions & 0 deletions internal/services/runservice/runservice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package runservice

import (
"bytes"
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"agola.io/agola/internal/objectstorage"
"agola.io/agola/internal/services/config"
"agola.io/agola/internal/services/runservice/common"
"agola.io/agola/internal/services/runservice/store"
"agola.io/agola/internal/testutil"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)

func setupEtcd(t *testing.T, logger *zap.Logger, dir string) *testutil.TestEmbeddedEtcd {
tetcd, err := testutil.NewTestEmbeddedEtcd(t, logger, dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := tetcd.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := tetcd.WaitUp(30 * time.Second); err != nil {
t.Fatalf("error waiting on etcd up: %v", err)
}
return tetcd
}

func shutdownEtcd(tetcd *testutil.TestEmbeddedEtcd) {
if tetcd.Etcd != nil {
_ = tetcd.Kill()
}
}

func setupRunservice(t *testing.T, ctx context.Context, dir string) (*Runservice, *testutil.TestEmbeddedEtcd, error) {
logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

etcdDir := filepath.Join(dir, "etcd")
tetcd := setupEtcd(t, logger, etcdDir)

config := config.Runservice{
DataDir: filepath.Join(dir, "runservice"),
Etcd: config.Etcd{
Endpoints: "",
},
ObjectStorage: config.ObjectStorage{
Type: "posix",
Path: filepath.Join(dir, "runservice/ost"),
},
RunLogExpireInterval: time.Millisecond,
}

config.Etcd.Endpoints = tetcd.Endpoint

rs, err := NewRunservice(ctx, logger, &config)
return rs, tetcd, err
}

func TestLogleaner(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rs, tetcd, err := setupRunservice(t, ctx, dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

defer shutdownEtcd(tetcd)

body := ioutil.NopCloser(bytes.NewBufferString("log test"))
logPath := store.OSTRunTaskStepLogPath("tast01", 0)

err = rs.ost.WriteObject(logPath, body, -1, false)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

_, err = rs.ost.ReadObject(logPath)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

err = rs.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.EtcdLogCleanerLockKey, 1*time.Millisecond)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

_, err = rs.ost.ReadObject(logPath)
if err == nil || !objectstorage.IsNotExist(err) {
t.Fatalf("expected err NotExists, got: %v", err)
}
}
30 changes: 23 additions & 7 deletions internal/services/runservice/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
const (
cacheCleanerInterval = 1 * 24 * time.Hour
workspaceCleanerInterval = 1 * 24 * time.Hour
logCleanerInterval = 1 * 24 * time.Hour

defaultExecutorNotAliveInterval = 60 * time.Second
)
Expand Down Expand Up @@ -1412,7 +1413,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.

func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) {
for {
if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil {
if err := s.objectsCleaner(ctx, store.OSTArchivesBaseDir(), common.EtcdWorkspaceCleanerLockKey, workspaceExpireInterval); err != nil {
log.Errorf("err: %+v", err)
}

Expand All @@ -1425,16 +1426,31 @@ func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireIn
}
}

func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error {
log.Debugf("workspaceCleaner")
func (s *Runservice) logCleanerLoop(ctx context.Context, logExpireInterval time.Duration) {
for {
if err := s.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.EtcdLogCleanerLockKey, logExpireInterval); err != nil {
log.Errorf("err: %+v", err)
}

sleepCh := time.NewTimer(logCleanerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (s *Runservice) objectsCleaner(ctx context.Context, prefix string, etcdLockKey string, objectExpireInterval time.Duration) error {
log.Debug("ObjectsCleaner")

session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
defer session.Close()

m := etcd.NewMutex(session, common.EtcdWorkspaceCleanerLockKey)
m := etcd.NewMutex(session, etcdLockKey)

if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
Expand All @@ -1446,14 +1462,14 @@ func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterv

doneCh := make(chan struct{})
defer close(doneCh)
for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) {
for object := range s.ost.List(prefix+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) {
if object.LastModified.Add(objectExpireInterval).Before(time.Now()) {
if err := s.ost.DeleteObject(object.Path); err != nil {
if !objectstorage.IsNotExist(err) {
log.Warnf("failed to delete workspace object %q: %v", object.Path, err)
log.Warnf("failed to delete object %q: %v", object.Path, err)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion internal/services/runservice/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*da
return action, nil
}

func OSTLogsBaseDir() string {
return "logs"
}

func OSTRunTaskLogsBaseDir(rtID string) string {
return path.Join("logs", rtID)
return path.Join(OSTLogsBaseDir(), rtID)
}

func OSTRunTaskLogsDataDir(rtID string) string {
Expand Down

0 comments on commit 7d4313e

Please sign in to comment.