Skip to content

Commit

Permalink
Runservice: added logCleanerLoop for deleting older logs
Browse files Browse the repository at this point in the history
  • Loading branch information
alessandro-sorint committed Feb 10, 2022
1 parent 6081787 commit fc49433
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 29 deletions.
2 changes: 2 additions & 0 deletions internal/services/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Runservice struct {

RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"`
RunLogExpireInterval time.Duration `yaml:"runLogExpireInterval"`
}

type Executor struct {
Expand Down Expand Up @@ -257,6 +258,7 @@ var defaultConfig = Config{
Runservice: Runservice{
RunCacheExpireInterval: 7 * 24 * time.Hour,
RunWorkspaceExpireInterval: 7 * 24 * time.Hour,
RunLogExpireInterval: 30 * 24 * time.Hour,
},
Executor: Executor{
InitImage: InitImage{
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
EtcdCompactChangeGroupsLockKey = path.Join(EtcdLocksDir, "compactchangegroups")
EtcdCacheCleanerLockKey = path.Join(EtcdLocksDir, "cachecleaner")
EtcdWorkspaceCleanerLockKey = path.Join(EtcdLocksDir, "workspacecleaner")
EtcdLogCleanerLockKey = path.Join(EtcdLocksDir, "logcleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdLocksDir, "taskupdater")

EtcdMaintenanceKey = "maintenance"
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/runservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func (s *Runservice) run(ctx context.Context) error {
util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) })
util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) })
util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) })
util.GoWait(&wg, func() { s.logCleanerLoop(ctx, s.c.RunLogExpireInterval) })
util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) })
util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) })
}
Expand Down
30 changes: 23 additions & 7 deletions internal/services/runservice/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
const (
cacheCleanerInterval = 1 * 24 * time.Hour
workspaceCleanerInterval = 1 * 24 * time.Hour
logCleanerInterval = 1 * 24 * time.Hour

defaultExecutorNotAliveInterval = 60 * time.Second
)
Expand Down Expand Up @@ -1412,7 +1413,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.

func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) {
for {
if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil {
if err := s.ObjectsCleaner(ctx, store.OSTArchivesBaseDir(), common.EtcdWorkspaceCleanerLockKey, workspaceExpireInterval); err != nil {
log.Errorf("err: %+v", err)
}

Expand All @@ -1425,16 +1426,31 @@ func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireIn
}
}

func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error {
log.Debugf("workspaceCleaner")
func (s *Runservice) logCleanerLoop(ctx context.Context, logExpireInterval time.Duration) {
for {
if err := s.ObjectsCleaner(ctx, store.OSTLogsBaseDir(), common.EtcdLogCleanerLockKey, logExpireInterval); err != nil {
log.Errorf("err: %+v", err)
}

sleepCh := time.NewTimer(logCleanerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (s *Runservice) ObjectsCleaner(ctx context.Context, prefix string, etcdLockKey string, objectExpireInterval time.Duration) error {
log.Debug("ObjectsCleaner")

session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
defer session.Close()

m := etcd.NewMutex(session, common.EtcdWorkspaceCleanerLockKey)
m := etcd.NewMutex(session, etcdLockKey)

if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
Expand All @@ -1446,14 +1462,14 @@ func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterv

doneCh := make(chan struct{})
defer close(doneCh)
for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) {
for object := range s.ost.List(prefix+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) {
if object.LastModified.Add(objectExpireInterval).Before(time.Now()) {
if err := s.ost.DeleteObject(object.Path); err != nil {
if !objectstorage.IsNotExist(err) {
log.Warnf("failed to delete workspace object %q: %v", object.Path, err)
log.Warnf("failed to delete object %q: %v", object.Path, err)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion internal/services/runservice/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*da
return action, nil
}

func OSTLogsBaseDir() string {
return "logs"
}

func OSTRunTaskLogsBaseDir(rtID string) string {
return path.Join("logs", rtID)
return path.Join(OSTLogsBaseDir(), rtID)
}

func OSTRunTaskLogsDataDir(rtID string) string {
Expand Down
Loading

0 comments on commit fc49433

Please sign in to comment.