Skip to content

Commit

Permalink
core/connpool: scrub lazily on Put()
Browse files Browse the repository at this point in the history
unclear if timer.Ticker would force wake up and/or prevent sleep
  • Loading branch information
ignoramous committed Oct 19, 2024
1 parent a2ceb84 commit 9539876
Showing 1 changed file with 45 additions and 30 deletions.
75 changes: 45 additions & 30 deletions intra/core/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"golang.org/x/sys/unix"
)

const useread = false // always false; here for doc purposes
const poolcapacity = 8 // default capacity
const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool
const Nobody = uintptr(0) // nobody
const useread = false // always false; here for doc purposes
const poolcapacity = 8 // default capacity
const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool
const Nobody = uintptr(0) // nobody
const scrubinterval = 5 * time.Minute // interval between subsequent scrubs

var errUnexpectedRead error = errors.New("pool: unexpected read")

Expand All @@ -34,42 +35,55 @@ type superpool[T comparable] struct {
}

type MultConnPool[T comparable] struct {
ctx context.Context
mu sync.RWMutex
m map[T]*superpool[T]
ctx context.Context
mu sync.RWMutex
m map[T]*superpool[T]
scrubtime time.Time
}

func NewMultConnPool[T comparable](ctx context.Context) *MultConnPool[T] {
mc := &MultConnPool[T]{
return &MultConnPool[T]{
ctx: ctx,
m: make(map[T]*superpool[T]),
}
every10m := time.NewTicker(10 * time.Minute)
go mc.scrub(ctx, every10m)
return mc
}

func (m *MultConnPool[T]) scrub(ctx context.Context, tick *time.Ticker) {
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
m.mu.Lock()
for id, super := range m.m {
if !super.pool.closed.Load() {
delete(m.m, id)
} else if super.pool.empty() {
super.quit()
delete(m.m, id)
} else {
go super.pool.scrub()
}
func (m *MultConnPool[T]) scrub() {
now := time.Now()
if now.Sub(m.scrubtime) <= scrubinterval { // too soon
return
}
m.scrubtime = now

select {
case <-m.ctx.Done():
return
default:
}

Go("superpool.scrub", func() {
m.mu.Lock()
defer m.mu.Unlock()

var n, nclosed, nquit, nscrubbed int
n = len(m.m)
for id, super := range m.m {
if super.pool.closed.Load() {
nclosed++
delete(m.m, id)
} else if super.pool.empty() {
nquit++
super.quit()
delete(m.m, id)
} else {
nscrubbed++
Go("poo.scrub", super.pool.scrub)
}
m.mu.Unlock()
}
}

log.D("pool: scrubbed: %d, closed: %d, quit: %d, total: %d",
nscrubbed, nclosed, nquit, n)
})
}

func (m *MultConnPool[T]) Get(id T) net.Conn {
Expand Down Expand Up @@ -106,6 +120,7 @@ func (m *MultConnPool[T]) Put(id T, conn net.Conn) bool {
m.mu.Unlock()
}

m.scrub()
return super.pool.Put(conn)
}

Expand Down

1 comment on commit 9539876

@ignoramous
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.