Skip to content

Commit

Permalink
meta/sql: fix flush quotas deadlock (#5706)
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <[email protected]>
  • Loading branch information
jiefenghuang authored Mar 4, 2025
1 parent d0d7931 commit f9bd33e
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type engine interface {
doSetQuota(ctx Context, inode Ino, quota *Quota) (created bool, err error)
doDelQuota(ctx Context, inode Ino) error
doLoadQuotas(ctx Context) (map[Ino]*Quota, error)
doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error
doFlushQuotas(ctx Context, quotas []*iQuota) error

doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno
doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode uint8, attr *Attr) syscall.Errno
Expand Down
31 changes: 19 additions & 12 deletions pkg/meta/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type Quota struct {
newSpace, newInodes int64
}

type iQuota struct {
inode Ino
quota *Quota
}

// Returns true if it will exceed the quota limit
func (q *Quota) check(space, inodes int64) bool {
if space > 0 {
Expand Down Expand Up @@ -411,33 +416,35 @@ func (m *baseMeta) doFlushQuotas() {
if !m.getFormat().DirStats {
return
}
stageMap := make(map[Ino]*Quota)

var quotas []*iQuota
m.quotaMu.RLock()
var newSpace, newInodes int64
for ino, q := range m.dirQuotas {
newSpace := atomic.LoadInt64(&q.newSpace)
newInodes := atomic.LoadInt64(&q.newInodes)
newSpace = atomic.LoadInt64(&q.newSpace)
newInodes = atomic.LoadInt64(&q.newInodes)
if newSpace != 0 || newInodes != 0 {
stageMap[ino] = &Quota{newSpace: newSpace, newInodes: newInodes}
quotas = append(quotas, &iQuota{inode: ino, quota: &Quota{newSpace: newSpace, newInodes: newInodes}})
}
}
m.quotaMu.RUnlock()
if len(stageMap) == 0 {
if len(quotas) == 0 {
return
}

if err := m.en.doFlushQuotas(Background(), stageMap); err != nil {
if err := m.en.doFlushQuotas(Background(), quotas); err != nil {
logger.Warnf("Flush quotas: %s", err)
} else {
m.quotaMu.RLock()
for ino, snap := range stageMap {
q := m.dirQuotas[ino]
for _, snap := range quotas {
q := m.dirQuotas[snap.inode]
if q == nil {
continue
}
atomic.AddInt64(&q.newSpace, -snap.newSpace)
atomic.AddInt64(&q.UsedSpace, snap.newSpace)
atomic.AddInt64(&q.newInodes, -snap.newInodes)
atomic.AddInt64(&q.UsedInodes, snap.newInodes)
atomic.AddInt64(&q.newSpace, -snap.quota.newSpace)
atomic.AddInt64(&q.UsedSpace, snap.quota.newSpace)
atomic.AddInt64(&q.newInodes, -snap.quota.newInodes)
atomic.AddInt64(&q.UsedInodes, snap.quota.newInodes)
}
m.quotaMu.RUnlock()
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3594,12 +3594,12 @@ func (m *redisMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) {
})
}

func (m *redisMeta) doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error {
func (m *redisMeta) doFlushQuotas(ctx Context, quotas []*iQuota) error {
_, err := m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
for ino, q := range quotas {
field := ino.String()
pipe.HIncrBy(ctx, m.dirQuotaUsedSpaceKey(), field, q.newSpace)
pipe.HIncrBy(ctx, m.dirQuotaUsedInodesKey(), field, q.newInodes)
for _, q := range quotas {
field := q.inode.String()
pipe.HIncrBy(ctx, m.dirQuotaUsedSpaceKey(), field, q.quota.newSpace)
pipe.HIncrBy(ctx, m.dirQuotaUsedInodesKey(), field, q.quota.newInodes)
}
return nil
})
Expand Down
7 changes: 4 additions & 3 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3523,11 +3523,12 @@ func (m *dbMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) {
return quotas, nil
}

func (m *dbMeta) doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error {
func (m *dbMeta) doFlushQuotas(ctx Context, quotas []*iQuota) error {
sort.Slice(quotas, func(i, j int) bool { return quotas[i].inode < quotas[j].inode })
return m.txn(func(s *xorm.Session) error {
for ino, q := range quotas {
for _, q := range quotas {
_, err := s.Exec("update jfs_dir_quota set used_space=used_space+?, used_inodes=used_inodes+? where inode=?",
q.newSpace, q.newInodes, ino)
q.quota.newSpace, q.quota.newInodes, q.inode)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2796,13 +2796,13 @@ func (m *kvMeta) doSyncUsedSpace(ctx Context) error {
return m.setValue(m.counterKey(usedSpace), packCounter(used))
}

func (m *kvMeta) doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error {
func (m *kvMeta) doFlushQuotas(ctx Context, quotas []*iQuota) error {
return m.txn(ctx, func(tx *kvTxn) error {
keys := make([][]byte, 0, len(quotas))
qs := make([]*Quota, 0, len(quotas))
for ino, q := range quotas {
keys = append(keys, m.dirQuotaKey(ino))
qs = append(qs, q)
for _, q := range quotas {
keys = append(keys, m.dirQuotaKey(q.inode))
qs = append(qs, q.quota)
}
for i, v := range tx.gets(keys...) {
if len(v) == 0 {
Expand Down

0 comments on commit f9bd33e

Please sign in to comment.