Skip to content

Commit

Permalink
storage and bucket summary: move and parallelize on-disk sizing
Browse files Browse the repository at this point in the history
* remove `fs.OnDiskSize` ('du') from the job's BEGIN phase
* run it in parallel with walking objects and, possibly, counting
  remotes
* refactor `newSumm` construction

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jun 17, 2024
1 parent 68a5a95 commit 2ad5845
Showing 1 changed file with 52 additions and 21 deletions.
73 changes: 52 additions & 21 deletions xact/xs/nsumm.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,36 +99,38 @@ func newSumm(p *nsummFactory) (r *XactNsumm, err error) {
DoLoad: mpather.LoadUnsafe,
IncludeCopy: true,
}
if p.Bck.IsQuery() {
if !p.Bck.IsQuery() {
r.initRes(&r.oneRes, p.Bck) // init single result-set
r.single = true
opts.Bck = p.Bck.Clone()
} else {
var single *meta.Bck
r.mapRes = make(map[uint64]*cmn.BsummResult, 8)
opts.Buckets, single = r.initResQbck()

opts.Buckets, single = r.initResQbck() // init multiple result-sets (one per bucket)

nb := len(opts.Buckets)
switch nb {
case 0:
return r, fmt.Errorf("no buckets matching %q", p.Bck.Bucket())
case 1:
// change of mind: single even though spec-ed as qbck
// change of mind: single result-set even though spec-ed as qbck
p.Bck = single
opts.Buckets = nil
r.buckets = nil
goto single

r.single = true
opts.Bck = p.Bck.Clone()
default:
// inc num joggers to boost
nmps := fs.NumAvail()
if nmps == 0 {
return r, cmn.ErrNoMountpaths
}
opts.PerBucket = nb*nmps <= sys.NumCPU()
goto ini
}
}
single:
r.initRes(&r.oneRes, p.Bck)
r.single = true
opts.Bck = p.Bck.Clone()
ini:

r.BckJog.Init(p.UUID(), p.Kind(), p.Bck, opts, cmn.GCO.Get())

s := fmt.Sprintf("-msg-%+v", r.p.msg)
Expand All @@ -141,39 +143,69 @@ func (r *XactNsumm) Run(started *sync.WaitGroup) {
started.Done()
nlog.Infoln(r.Name(), r.p.Bck.Cname(""))

var wg cos.WG
var (
rwg, lwg cos.WG
)
// (I) remote
if r.listRemote {
// _this_ target to list-and-summ remote pages, in parallel
if r.single {
wg = &sync.WaitGroup{}
wg.Add(1)
rwg = &sync.WaitGroup{}
rwg.Add(1)
go func(wg cos.WG) {
r.runCloudBck(r.p.Bck, &r.oneRes)
wg.Done()
}(wg)
}(rwg)
} else {
debug.Assert(len(r.buckets) > 1)
wg = cos.NewLimitedWaitGroup(sys.NumCPU(), len(r.buckets))
rwg = cos.NewLimitedWaitGroup(sys.NumCPU(), len(r.buckets))
for _, bck := range r.buckets {
res, ok := r.mapRes[bck.Props.BID]
debug.Assert(ok, r.Name(), bck.Cname(""))
wg.Add(1)
go func(bck *meta.Bck, wg cos.WG) {
rwg.Add(1)
go func(bck *meta.Bck, res *cmn.BsummResult, wg cos.WG) {
r.runCloudBck(bck, res)
wg.Done()
}(bck, wg)
}(bck, res, rwg)
}
}
}

// (II) on disk size (formerly, 'du')
if r.single {
lwg = &sync.WaitGroup{}
lwg.Add(1)
go func(wg cos.WG) {
res := &r.oneRes
res.TotalSize.OnDisk = fs.OnDiskSize(r.p.Bck.Bucket(), r.p.msg.Prefix)
wg.Done()
}(lwg)
} else {
debug.Assert(len(r.buckets) > 1)
lwg = cos.NewLimitedWaitGroup(sys.NumCPU(), len(r.buckets))
for _, bck := range r.buckets {
res, ok := r.mapRes[bck.Props.BID]
debug.Assert(ok, r.Name(), bck.Cname(""))
lwg.Add(1)
go func(bck *meta.Bck, res *cmn.BsummResult, wg cos.WG) {
res.TotalSize.OnDisk = fs.OnDiskSize(bck.Bucket(), r.p.msg.Prefix)
wg.Done()
}(bck, res, lwg)
}
}

// (III) visit objects
r.BckJog.Run()

err := r.BckJog.Wait()
if err != nil {
r.AddErr(err)
}
if wg != nil {

lwg.Wait()
if rwg != nil {
debug.Assert(r.listRemote)
wg.Wait()
rwg.Wait()
}

r.Finish()
Expand Down Expand Up @@ -219,7 +251,6 @@ func (r *XactNsumm) initRes(res *cmn.BsummResult, bck *meta.Bck) {
res.Bck = bck.Clone()
res.TotalSize.Disks = r.totalDiskSize
res.ObjSize.Min = math.MaxInt64
res.TotalSize.OnDisk = fs.OnDiskSize(bck.Bucket(), r.p.msg.Prefix)
}

func (r *XactNsumm) String() string { return r._str }
Expand Down

0 comments on commit 2ad5845

Please sign in to comment.