From 9539876597d902b9bdac0f7b16358e440ae8acf7 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 20 Oct 2024 04:42:38 +0530 Subject: [PATCH] core/connpool: scrub lazily on Put() unclear if timer.Ticker would force wake up and/or prevent sleep --- intra/core/connpool.go | 75 +++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/intra/core/connpool.go b/intra/core/connpool.go index 903760df..2ad60724 100644 --- a/intra/core/connpool.go +++ b/intra/core/connpool.go @@ -21,10 +21,11 @@ import ( "golang.org/x/sys/unix" ) -const useread = false // always false; here for doc purposes -const poolcapacity = 8 // default capacity -const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool -const Nobody = uintptr(0) // nobody +const useread = false // always false; here for doc purposes +const poolcapacity = 8 // default capacity +const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool +const Nobody = uintptr(0) // nobody +const scrubinterval = 5 * time.Minute // interval between subsequent scrubs var errUnexpectedRead error = errors.New("pool: unexpected read") @@ -34,42 +35,55 @@ type superpool[T comparable] struct { } type MultConnPool[T comparable] struct { - ctx context.Context - mu sync.RWMutex - m map[T]*superpool[T] + ctx context.Context + mu sync.RWMutex + m map[T]*superpool[T] + scrubtime time.Time } func NewMultConnPool[T comparable](ctx context.Context) *MultConnPool[T] { - mc := &MultConnPool[T]{ + return &MultConnPool[T]{ ctx: ctx, m: make(map[T]*superpool[T]), } - every10m := time.NewTicker(10 * time.Minute) - go mc.scrub(ctx, every10m) - return mc } -func (m *MultConnPool[T]) scrub(ctx context.Context, tick *time.Ticker) { - defer tick.Stop() - for { - select { - case <-ctx.Done(): - return - case <-tick.C: - m.mu.Lock() - for id, super := range m.m { - if !super.pool.closed.Load() { - delete(m.m, id) - } else if super.pool.empty() { - super.quit() - delete(m.m, id) - } else { - go super.pool.scrub() - } +func (m *MultConnPool[T]) scrub() { + now := time.Now() + if now.Sub(m.scrubtime) <= scrubinterval { // too soon + return + } + m.scrubtime = now + + select { + case <-m.ctx.Done(): + return + default: + } + + Go("superpool.scrub", func() { + m.mu.Lock() + defer m.mu.Unlock() + + var n, nclosed, nquit, nscrubbed int + n = len(m.m) + for id, super := range m.m { + if super.pool.closed.Load() { + nclosed++ + delete(m.m, id) + } else if super.pool.empty() { + nquit++ + super.quit() + delete(m.m, id) + } else { + nscrubbed++ + Go("poo.scrub", super.pool.scrub) } - m.mu.Unlock() } - } + + log.D("pool: scrubbed: %d, closed: %d, quit: %d, total: %d", + nscrubbed, nclosed, nquit, n) + }) } func (m *MultConnPool[T]) Get(id T) net.Conn { @@ -106,6 +120,7 @@ func (m *MultConnPool[T]) Put(id T, conn net.Conn) bool { m.mu.Unlock() } + m.scrub() return super.pool.Put(conn) }