Skip to content

Commit

Permalink
cache: set max batch size limit to prune
Browse files Browse the repository at this point in the history
Avoid keeping the cache manager lock for too long. Note
that prune is already batched based on deletion criteria
and when items become releaseable. This adds extra limit
to make sure it never gets very large.

Signed-off-by: Tonis Tiigi <[email protected]>
  • Loading branch information
tonistiigi committed Nov 10, 2023
1 parent a9a5aaf commit 30a1b0e
Showing 1 changed file with 36 additions and 33 deletions.
69 changes: 36 additions & 33 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
errInvalid = errors.New("invalid")
)

const maxPruneBatch = 10 // maximum number of refs to prune while holding the manager lock

type ManagerOpt struct {
Snapshotter snapshot.Snapshotter
ContentStore content.Store
Expand Down Expand Up @@ -1057,7 +1059,7 @@ func (cm *cacheManager) pruneOnce(ctx context.Context, ch chan client.UsageInfo,
})
}

func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt pruneOpt) error {
func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt pruneOpt) (err error) {
var toDelete []*deleteRecord

if opt.keepBytes != 0 && opt.totalSize < opt.keepBytes {
Expand Down Expand Up @@ -1130,48 +1132,49 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt
lastUsedAt: c.LastUsedAt,
usageCount: c.UsageCount,
})
if !gcMode {
cr.dead = true

// mark metadata as deleted in case we crash before cleanup finished
if err := cr.queueDeleted(); err != nil {
cr.mu.Unlock()
cm.mu.Unlock()
return err
}
if err := cr.commitMetadata(); err != nil {
cr.mu.Unlock()
cm.mu.Unlock()
return err
}
} else {
locked[cr.mu] = struct{}{}
continue // leave the record locked
}
locked[cr.mu] = struct{}{}
continue // leave the record locked
}
}
cr.mu.Unlock()
}

batchSize := len(toDelete)
if gcMode && len(toDelete) > 0 {
batchSize = 1
sortDeleteRecords(toDelete)
var err error
for i, cr := range toDelete {
// only remove single record at a time
if i == 0 {
cr.dead = true
err = cr.queueDeleted()
if err == nil {
err = cr.commitMetadata()
}
} else if batchSize > maxPruneBatch {
batchSize = maxPruneBatch
}

releaseLocks := func() {
for _, cr := range toDelete {
if !cr.released {
cr.released = true
cr.mu.Unlock()
}
cr.mu.Unlock()
}
if err != nil {
return err
cm.mu.Unlock()
}

for i, cr := range toDelete {
// only remove single record at a time
if i < batchSize {
cr.dead = true
// mark metadata as deleted in case we crash before cleanup finished
if err := cr.queueDeleted(); err != nil {
releaseLocks()
return err
}
if err := cr.commitMetadata(); err != nil {
releaseLocks()
return err
}
}
toDelete = toDelete[:1]
cr.mu.Unlock()
cr.released = true
}
toDelete = toDelete[:batchSize]

cm.mu.Unlock()

Expand All @@ -1195,7 +1198,6 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt
}

cm.mu.Lock()
var err error
for _, cr := range toDelete {
cr.mu.Lock()

Expand Down Expand Up @@ -1613,6 +1615,7 @@ type deleteRecord struct {
usageCount int
lastUsedAtIndex int
usageCountIndex int
released bool
}

func sortDeleteRecords(toDelete []*deleteRecord) {
Expand Down

0 comments on commit 30a1b0e

Please sign in to comment.