diff --git a/pkg/meta/base.go b/pkg/meta/base.go index d0b4d6a06e52..bea4d9c2758f 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -96,7 +96,7 @@ type engine interface { doSetQuota(ctx Context, inode Ino, quota *Quota) (created bool, err error) doDelQuota(ctx Context, inode Ino) error doLoadQuotas(ctx Context) (map[Ino]*Quota, error) - doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error + doFlushQuotas(ctx Context, quotas []*iQuota) error doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode uint8, attr *Attr) syscall.Errno diff --git a/pkg/meta/quota.go b/pkg/meta/quota.go index 9b7c63e0c2f3..e9fbf5943eb6 100644 --- a/pkg/meta/quota.go +++ b/pkg/meta/quota.go @@ -41,6 +41,11 @@ type Quota struct { newSpace, newInodes int64 } +type iQuota struct { + inode Ino + quota *Quota +} + // Returns true if it will exceed the quota limit func (q *Quota) check(space, inodes int64) bool { if space > 0 { @@ -411,33 +416,35 @@ func (m *baseMeta) doFlushQuotas() { if !m.getFormat().DirStats { return } - stageMap := make(map[Ino]*Quota) + + var quotas []*iQuota m.quotaMu.RLock() + var newSpace, newInodes int64 for ino, q := range m.dirQuotas { - newSpace := atomic.LoadInt64(&q.newSpace) - newInodes := atomic.LoadInt64(&q.newInodes) + newSpace = atomic.LoadInt64(&q.newSpace) + newInodes = atomic.LoadInt64(&q.newInodes) if newSpace != 0 || newInodes != 0 { - stageMap[ino] = &Quota{newSpace: newSpace, newInodes: newInodes} + quotas = append(quotas, &iQuota{inode: ino, quota: &Quota{newSpace: newSpace, newInodes: newInodes}}) } } m.quotaMu.RUnlock() - if len(stageMap) == 0 { + if len(quotas) == 0 { return } - if err := m.en.doFlushQuotas(Background(), stageMap); err != nil { + if err := m.en.doFlushQuotas(Background(), quotas); err != nil { logger.Warnf("Flush quotas: %s", err) } else { m.quotaMu.RLock() - for ino, snap := range stageMap { - q := m.dirQuotas[ino] + for _, snap := range quotas { + q := m.dirQuotas[snap.inode] if q == nil { continue } - atomic.AddInt64(&q.newSpace, -snap.newSpace) - atomic.AddInt64(&q.UsedSpace, snap.newSpace) - atomic.AddInt64(&q.newInodes, -snap.newInodes) - atomic.AddInt64(&q.UsedInodes, snap.newInodes) + atomic.AddInt64(&q.newSpace, -snap.quota.newSpace) + atomic.AddInt64(&q.UsedSpace, snap.quota.newSpace) + atomic.AddInt64(&q.newInodes, -snap.quota.newInodes) + atomic.AddInt64(&q.UsedInodes, snap.quota.newInodes) } m.quotaMu.RUnlock() } diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 2c627168050d..2487daa1d7a5 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -3594,12 +3594,12 @@ func (m *redisMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) { }) } -func (m *redisMeta) doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error { +func (m *redisMeta) doFlushQuotas(ctx Context, quotas []*iQuota) error { _, err := m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - for ino, q := range quotas { - field := ino.String() - pipe.HIncrBy(ctx, m.dirQuotaUsedSpaceKey(), field, q.newSpace) - pipe.HIncrBy(ctx, m.dirQuotaUsedInodesKey(), field, q.newInodes) + for _, q := range quotas { + field := q.inode.String() + pipe.HIncrBy(ctx, m.dirQuotaUsedSpaceKey(), field, q.quota.newSpace) + pipe.HIncrBy(ctx, m.dirQuotaUsedInodesKey(), field, q.quota.newInodes) } return nil }) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 352b93cd534d..0eb2f7cf749f 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -3523,11 +3523,12 @@ func (m *dbMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) { return quotas, nil } -func (m *dbMeta) doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error { +func (m *dbMeta) doFlushQuotas(ctx Context, quotas []*iQuota) error { + sort.Slice(quotas, func(i, j int) bool { return quotas[i].inode < quotas[j].inode }) return m.txn(func(s *xorm.Session) error { - for ino, q := range quotas { + for _, q := range quotas { _, err := s.Exec("update jfs_dir_quota set used_space=used_space+?, used_inodes=used_inodes+? where inode=?", - q.newSpace, q.newInodes, ino) + q.quota.newSpace, q.quota.newInodes, q.inode) if err != nil { return err } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 114c5a046a2c..515600f95aa2 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2796,13 +2796,13 @@ func (m *kvMeta) doSyncUsedSpace(ctx Context) error { return m.setValue(m.counterKey(usedSpace), packCounter(used)) } -func (m *kvMeta) doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error { +func (m *kvMeta) doFlushQuotas(ctx Context, quotas []*iQuota) error { return m.txn(ctx, func(tx *kvTxn) error { keys := make([][]byte, 0, len(quotas)) qs := make([]*Quota, 0, len(quotas)) - for ino, q := range quotas { - keys = append(keys, m.dirQuotaKey(ino)) - qs = append(qs, q) + for _, q := range quotas { + keys = append(keys, m.dirQuotaKey(q.inode)) + qs = append(qs, q.quota) } for i, v := range tx.gets(keys...) { if len(v) == 0 {