Skip to content

Commit

Permalink
Merge pull request #294 from alessandro-sorint/delete-run-logs
Browse files Browse the repository at this point in the history
runservice: added logcleanerLoop for deleting older logs
  • Loading branch information
sgotti authored Jul 13, 2022
2 parents 529c81d + 0cf6a0a commit 20ca985
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 8 deletions.
2 changes: 2 additions & 0 deletions internal/services/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type Runservice struct {

RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"`
RunLogExpireInterval time.Duration `yaml:"runLogExpireInterval"`
}

type Executor struct {
Expand Down Expand Up @@ -255,6 +256,7 @@ var defaultConfig = Config{
Runservice: Runservice{
RunCacheExpireInterval: 7 * 24 * time.Hour,
RunWorkspaceExpireInterval: 7 * 24 * time.Hour,
RunLogExpireInterval: 30 * 24 * time.Hour,
},
Executor: Executor{
InitImage: InitImage{
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (

CacheCleanerLockKey = "cachecleaner"
WorkspaceCleanerLockKey = "workspacecleaner"
LogCleanerLockKey = "logcleaner"
TaskUpdaterLockKey = "taskupdater"
)

Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/runservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (s *Runservice) run(ctx context.Context) error {
util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) })
util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) })
util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) })
util.GoWait(&wg, func() { s.logCleanerLoop(ctx, s.c.RunLogExpireInterval) })
util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) })
}

Expand Down
37 changes: 37 additions & 0 deletions internal/services/runservice/runservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
"time"

"agola.io/agola/internal/errors"
"agola.io/agola/internal/objectstorage"
"agola.io/agola/internal/services/config"
"agola.io/agola/internal/services/runservice/action"
"agola.io/agola/internal/services/runservice/common"
"agola.io/agola/internal/services/runservice/store"
"agola.io/agola/internal/sql"
"agola.io/agola/internal/testutil"
"agola.io/agola/internal/util"
Expand Down Expand Up @@ -280,3 +283,37 @@ func TestGetRunsLastRun(t *testing.T) {
}
}
}

func TestLogleaner(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()
log := testutil.NewLogger(t)

rs := setupRunservice(ctx, t, log, dir)
rs.c.RunCacheExpireInterval = 604800000000000

body := ioutil.NopCloser(bytes.NewBufferString("log test"))
logPath := store.OSTRunTaskStepLogPath("task01", 0)

err := rs.ost.WriteObject(logPath, body, -1, false)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

_, err = rs.ost.ReadObject(logPath)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

time.Sleep(1 * time.Second)

err = rs.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.LogCleanerLockKey, 1*time.Millisecond)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

_, err = rs.ost.ReadObject(logPath)
if err == nil || !objectstorage.IsNotExist(err) {
t.Fatalf("expected err NotExists, got: %v", err)
}
}
32 changes: 25 additions & 7 deletions internal/services/runservice/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
changeGroupCompactorInterval = 1 * time.Minute
cacheCleanerInterval = 1 * 24 * time.Hour
workspaceCleanerInterval = 1 * 24 * time.Hour
logCleanerInterval = 1 * 24 * time.Hour

defaultExecutorNotAliveInterval = 60 * time.Second

Expand Down Expand Up @@ -1589,7 +1590,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.

func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) {
for {
if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil {
if err := s.objectsCleaner(ctx, store.OSTArchivesBaseDir(), common.WorkspaceCleanerLockKey, workspaceExpireInterval); err != nil {
s.log.Err(err).Send()
}

Expand All @@ -1602,25 +1603,42 @@ func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireIn
}
}

func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error {
s.log.Debug().Msgf("workspaceCleaner")
func (s *Runservice) logCleanerLoop(ctx context.Context, logExpireInterval time.Duration) {
s.log.Debug().Msgf("logCleanerLoop")

for {
if err := s.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.LogCleanerLockKey, logExpireInterval); err != nil {
s.log.Warn().Msgf("objectsCleaner error: %v", err)
}

sleepCh := time.NewTimer(logCleanerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (s *Runservice) objectsCleaner(ctx context.Context, prefix string, etcdLockKey string, objectExpireInterval time.Duration) error {
s.log.Debug().Msgf("objectsCleaner")

l := s.lf.NewLock(common.WorkspaceCleanerLockKey)
if err := l.Lock(ctx); err != nil {
return errors.Wrap(err, "failed to acquire workspace cleaner lock")
return errors.Wrap(err, "failed to acquire object cleaner lock")
}
defer func() { _ = l.Unlock() }()

doneCh := make(chan struct{})
defer close(doneCh)
for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) {
for object := range s.ost.List(prefix+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) {
if object.LastModified.Add(objectExpireInterval).Before(time.Now()) {
if err := s.ost.DeleteObject(object.Path); err != nil {
if !objectstorage.IsNotExist(err) {
s.log.Warn().Msgf("failed to delete workspace object %q: %v", object.Path, err)
s.log.Warn().Msgf("failed to delete object %q: %v", object.Path, err)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion internal/services/runservice/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
)

func OSTRunTaskLogsBaseDir(rtID string) string {
return path.Join("logs", rtID)
return path.Join(OSTLogsBaseDir(), rtID)
}

func OSTLogsBaseDir() string {
return "logs"
}

func OSTRunTaskLogsDataDir(rtID string) string {
Expand Down

0 comments on commit 20ca985

Please sign in to comment.