From 2ad584561ee7f891b3e49efb4513d9d761e46873 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Mon, 17 Jun 2024 14:52:05 -0400 Subject: [PATCH] storage and bucket summary: move and parallelize on-disk sizing * remove `fs.OnDiskSize` ('du') from the job's BEGIN phase * run it in parallel with walking objects and, possibly, counting remotes * refactor `newSumm` construction Signed-off-by: Alex Aizman --- xact/xs/nsumm.go | 73 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/xact/xs/nsumm.go b/xact/xs/nsumm.go index 3b9a1d6d88..d8c0dc0e6e 100644 --- a/xact/xs/nsumm.go +++ b/xact/xs/nsumm.go @@ -99,21 +99,28 @@ func newSumm(p *nsummFactory) (r *XactNsumm, err error) { DoLoad: mpather.LoadUnsafe, IncludeCopy: true, } - if p.Bck.IsQuery() { + if !p.Bck.IsQuery() { + r.initRes(&r.oneRes, p.Bck) // init single result-set + r.single = true + opts.Bck = p.Bck.Clone() + } else { var single *meta.Bck r.mapRes = make(map[uint64]*cmn.BsummResult, 8) - opts.Buckets, single = r.initResQbck() + + opts.Buckets, single = r.initResQbck() // init multiple result-sets (one per bucket) nb := len(opts.Buckets) switch nb { case 0: return r, fmt.Errorf("no buckets matching %q", p.Bck.Bucket()) case 1: - // change of mind: single even though spec-ed as qbck + // change of mind: single result-set even though spec-ed as qbck p.Bck = single opts.Buckets = nil r.buckets = nil - goto single + + r.single = true + opts.Bck = p.Bck.Clone() default: // inc num joggers to boost nmps := fs.NumAvail() @@ -121,14 +128,9 @@ func newSumm(p *nsummFactory) (r *XactNsumm, err error) { return r, cmn.ErrNoMountpaths } opts.PerBucket = nb*nmps <= sys.NumCPU() - goto ini } } -single: - r.initRes(&r.oneRes, p.Bck) - r.single = true - opts.Bck = p.Bck.Clone() -ini: + r.BckJog.Init(p.UUID(), p.Kind(), p.Bck, opts, cmn.GCO.Get()) s := fmt.Sprintf("-msg-%+v", r.p.msg) @@ -141,39 +143,69 @@ func (r *XactNsumm) Run(started *sync.WaitGroup) { started.Done() nlog.Infoln(r.Name(), r.p.Bck.Cname("")) - var wg cos.WG + var ( + rwg, lwg cos.WG + ) + // (I) remote if r.listRemote { // _this_ target to list-and-summ remote pages, in parallel if r.single { - wg = &sync.WaitGroup{} - wg.Add(1) + rwg = &sync.WaitGroup{} + rwg.Add(1) go func(wg cos.WG) { r.runCloudBck(r.p.Bck, &r.oneRes) wg.Done() - }(wg) + }(rwg) } else { debug.Assert(len(r.buckets) > 1) - wg = cos.NewLimitedWaitGroup(sys.NumCPU(), len(r.buckets)) + rwg = cos.NewLimitedWaitGroup(sys.NumCPU(), len(r.buckets)) for _, bck := range r.buckets { res, ok := r.mapRes[bck.Props.BID] debug.Assert(ok, r.Name(), bck.Cname("")) - wg.Add(1) - go func(bck *meta.Bck, wg cos.WG) { + rwg.Add(1) + go func(bck *meta.Bck, res *cmn.BsummResult, wg cos.WG) { r.runCloudBck(bck, res) wg.Done() - }(bck, wg) + }(bck, res, rwg) } } } + + // (II) on disk size (formerly, 'du') + if r.single { + lwg = &sync.WaitGroup{} + lwg.Add(1) + go func(wg cos.WG) { + res := &r.oneRes + res.TotalSize.OnDisk = fs.OnDiskSize(r.p.Bck.Bucket(), r.p.msg.Prefix) + wg.Done() + }(lwg) + } else { + debug.Assert(len(r.buckets) > 1) + lwg = cos.NewLimitedWaitGroup(sys.NumCPU(), len(r.buckets)) + for _, bck := range r.buckets { + res, ok := r.mapRes[bck.Props.BID] + debug.Assert(ok, r.Name(), bck.Cname("")) + lwg.Add(1) + go func(bck *meta.Bck, res *cmn.BsummResult, wg cos.WG) { + res.TotalSize.OnDisk = fs.OnDiskSize(bck.Bucket(), r.p.msg.Prefix) + wg.Done() + }(bck, res, lwg) + } + } + + // (III) visit objects r.BckJog.Run() err := r.BckJog.Wait() if err != nil { r.AddErr(err) } - if wg != nil { + + lwg.Wait() + if rwg != nil { debug.Assert(r.listRemote) - wg.Wait() + rwg.Wait() } r.Finish() @@ -219,7 +251,6 @@ func (r *XactNsumm) initRes(res *cmn.BsummResult, bck *meta.Bck) { res.Bck = bck.Clone() res.TotalSize.Disks = r.totalDiskSize res.ObjSize.Min = math.MaxInt64 - res.TotalSize.OnDisk = fs.OnDiskSize(bck.Bucket(), r.p.msg.Prefix) } func (r *XactNsumm) String() string { return r._str }