Skip to content

Commit

Permalink
core/connpool: m refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ignoramous committed Oct 20, 2024
1 parent 6b50497 commit e2b1be6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 51 deletions.
96 changes: 48 additions & 48 deletions intra/core/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import (
"golang.org/x/sys/unix"
)

const useread = false // never used; for documentation only
const poolcapacity = 8 // default capacity
const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool
const Nobody = uintptr(0) // nobody
const scrubinterval = 5 * time.Minute // interval between subsequent scrubs
const maxttl = 8 * time.Minute // close unused pooled conns after this period
const pooluseread = false // never used; for documentation only
const poolcapacity = 8 // default capacity
const poolmaxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool
const Nobody = uintptr(0) // nobody
const poolscrubinterval = 5 * time.Minute // interval between subsequent scrubs
const poolmaxttl = 8 * time.Minute // close unused pooled conns after this period

// go.dev/play/p/ig2Zpk-LTSv
var (
kaidle = int(maxttl / 5 / time.Second) // 8m / 5 => 96s
kainterval = int(maxttl / 10 / time.Second) // 8m / 10 => 48s
kaidle = int(poolmaxttl / 5 / time.Second) // 8m / 5 => 96s
kainterval = int(poolmaxttl / 10 / time.Second) // 8m / 10 => 48s
)

var (
Expand Down Expand Up @@ -63,7 +63,7 @@ func NewMultConnPool[T comparable](ctx context.Context) *MultConnPool[T] {

func (m *MultConnPool[T]) scrub() {
now := time.Now()
if now.Sub(m.scrubtime) <= scrubinterval { // too soon
if now.Sub(m.scrubtime) <= poolscrubinterval { // too soon
return
}
m.scrubtime = now
Expand Down Expand Up @@ -138,33 +138,32 @@ func (m *MultConnPool[T]) Put(id T, conn net.Conn) (ok bool) {
}

type agingconn struct {
c net.Conn
sc syscall.Conn
dob time.Time
str string
c net.Conn // pooled conn
sc PoolableConn // raw conn; may be nil
dob time.Time // induction time
str string // local and remote addrs
}

func newAgingConn(c net.Conn) agingconn {
var sc PoolableConn

s := conn2str(c)
if sc, ok := c.(syscall.Conn); ok {
return agingconn{c, sc, time.Now(), s}
} else if dc, ok := c.(*dns.Conn); ok {
if tc, ok := dc.Conn.(*tls.Conn); ok {
if sc, ok := tc.NetConn().(syscall.Conn); ok {
return agingconn{c, sc, time.Now(), s}
}
log.W("pool: dns.Conn not sys.Conn: %T", tc.NetConn())
} else if dc, ok := dc.Conn.(syscall.Conn); ok {
return agingconn{c, dc, time.Now(), s}
}
log.W("pool: dns.Conn not sys.Conn: %T", dc.Conn)
} else if tc, ok := c.(*tls.Conn); ok {
if sc, ok := tc.NetConn().(syscall.Conn); ok {
return agingconn{c, sc, time.Now(), s}
}
log.W("pool: conn not a sys.Conn: %T", c)
}
return agingconn{c, nil, time.Time{}, s}
if sc, _ = c.(PoolableConn); sc != nil {
// ok
} else if dc, _ := c.(*dns.Conn); dc != nil {
if tc, _ := dc.Conn.(*tls.Conn); tc != nil {
if sc, _ = tc.NetConn().(PoolableConn); sc == nil {
log.W("pool: dnsconn != sysconn: %T", tc.NetConn())
} // else: ok
} else if sc, _ = dc.Conn.(PoolableConn); sc == nil {
log.W("pool: dnsconn != sysconn: %T", dc.Conn)
} // else: ok
} else if tc, _ := c.(*tls.Conn); tc != nil {
if sc, _ = tc.NetConn().(PoolableConn); sc == nil {
log.W("pool: tlsconn != sysconn: %T", tc.NetConn())
} // else: ok
} // sc is nil
return agingconn{c, sc, time.Time{}, s}
}

// github.com/redis/go-redis/blob/d9eeed13/internal/pool/pool.go
Expand Down Expand Up @@ -197,13 +196,13 @@ func (c *ConnPool[T]) Get() (zz net.Conn) {

pooled, complete := Grx("pool.get", func(ctx context.Context) (zz net.Conn) {
i := 0
for i < maxattempts {
for i < poolmaxattempts {
i++
select {
case aconn := <-c.p:
// if readable, return conn regardless of its freshness
if aconn.readable() {
aconn.nokeepalive()
aconn.keepalive(false)
return aconn.c
}
(&aconn).close()
Expand Down Expand Up @@ -247,7 +246,7 @@ func (c *ConnPool[T]) Put(conn net.Conn) (ok bool) {

select {
case c.p <- aconn:
aconn.keepalive()
aconn.keepalive(true)
return true
case <-c.ctx.Done(): // stop
return false
Expand Down Expand Up @@ -327,7 +326,7 @@ func (a agingconn) ok() bool {

func (a agingconn) fresh() bool {
return a.dob != (time.Time{}) &&
time.Since(a.dob) < maxttl
time.Since(a.dob) < poolmaxttl
}

func (a *agingconn) close() {
Expand All @@ -344,34 +343,35 @@ func (a agingconn) readable() bool {
return err == nil
}

func (a agingconn) keepalive() bool {
cleardeadline(a.c) // reset any previous timeout
return SetKeepAliveConfigSockOpt(a.c, kaidle, kainterval)
}

func (a agingconn) nokeepalive() bool {
if tc, ok := a.c.(*net.TCPConn); ok {
return tc.SetKeepAlive(false) == nil
func (a agingconn) keepalive(y bool) bool {
if y {
cleardeadline(a.c) // reset any previous timeout
return SetKeepAliveConfigSockOpt(a.c, kaidle, kainterval)
} else {
if tc, ok := a.c.(*net.TCPConn); ok {
return tc.SetKeepAlive(false) == nil
}
return false
}
return false
}

// github.com/go-sql-driver/mysql/blob/f20b28636/conncheck.go
// github.com/redis/go-redis/blob/cc9bcb0c0/internal/pool/conn_check.go
func (a agingconn) canread() error {
if a.sc == nil {
sc := a.sc
if sc == nil {
return errNotSyscallConn
}

var checkErr error
var ctlErr error

raw, err := a.sc.SyscallConn()
raw, err := sc.SyscallConn()
if err != nil {
return fmt.Errorf("pool: sysconn: %w", err)
}

if useread { // stackoverflow.com/q/12741386
if pooluseread { // stackoverflow.com/q/12741386
ctlErr = raw.Read(func(fd uintptr) bool {

Check failure on line 375 in intra/core/connpool.go

View workflow job for this annotation

GitHub Actions / 🧬 Build

error: Potential nil panic detected. Observed nil flow from source to dereference point:
// 0 byte reads do not work to detect readability:
// see: go-review.googlesource.com/c/go/+/23227
Expand Down
4 changes: 1 addition & 3 deletions intra/core/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ type DuplexConn interface {
}

// so it can be pooled by ConnPool.
type PoolableConn interface {
syscall.Conn
}
type PoolableConn syscall.Conn

type ICMPConn interface {
net.PacketConn
Expand Down

0 comments on commit e2b1be6

Please sign in to comment.