From 91437d1564eeaf1c2d2e68ac11137a8de709fff3 Mon Sep 17 00:00:00 2001 From: Murtaza Aliakbar Date: Sun, 20 Oct 2024 05:54:49 +0530 Subject: [PATCH] core/connpool: tcp keepalive & max ttl --- intra/core/connpool.go | 67 ++++++++++++++++++++---------- intra/core/ping.go | 6 +++ intra/{protect => core}/sockopt.go | 60 +++++++++++++++----------- intra/ipn/auto.go | 12 +++++- intra/ipn/base.go | 4 +- intra/ipn/exit.go | 3 +- 6 files changed, 100 insertions(+), 52 deletions(-) rename intra/{protect => core}/sockopt.go (57%) diff --git a/intra/core/connpool.go b/intra/core/connpool.go index 719977f4..232b9823 100644 --- a/intra/core/connpool.go +++ b/intra/core/connpool.go @@ -26,7 +26,13 @@ const poolcapacity = 8 // default capacity const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool const Nobody = uintptr(0) // nobody const scrubinterval = 5 * time.Minute // interval between subsequent scrubs +const maxttl = 8 * time.Minute // close unused pooled conns after this period +// go.dev/play/p/ig2Zpk-LTSv +var ( + kaidle = int(maxttl / 5 / time.Second) // 8m / 5 => 96s + kainterval = int(maxttl / 10 / time.Second) // 8m / 10 => 48s +) var errUnexpectedRead error = errors.New("pool: unexpected read") type superpool[T comparable] struct { @@ -124,11 +130,16 @@ func (m *MultConnPool[T]) Put(id T, conn net.Conn) bool { return super.pool.Put(conn) } +type timedconn struct { + c net.Conn + dob time.Time +} + // github.com/redis/go-redis/blob/d9eeed13/internal/pool/pool.go type ConnPool[T comparable] struct { ctx context.Context id T - p chan net.Conn // never closed + p chan timedconn // never closed closed atomic.Bool } @@ -136,7 +147,7 @@ func NewConnPool[T comparable](ctx context.Context, id T) *ConnPool[T] { c := &ConnPool[T]{ ctx: ctx, id: id, - p: make(chan net.Conn, poolcapacity), + p: make(chan timedconn, poolcapacity), } context.AfterFunc(ctx, c.clean) @@ -157,13 +168,13 @@ func (c *ConnPool[T]) Get() (zz net.Conn) { for i < maxattempts { i++ select { - case conn := <-c.p: - if readable(conn) { - // reset previous timeout - _ = conn.SetDeadline(time.Time{}) - return conn + case tconn := <-c.p: + // if readable, return conn regardless of its freshness + if readable(tconn.c) { + nokeepalive(tconn.c) + return tconn.c } - clos(conn) + CloseConn(tconn.c) case <-ctx.Done(): return // signal stop default: @@ -189,8 +200,11 @@ func (c *ConnPool[T]) Put(conn net.Conn) (ok bool) { return } + tconn := timedconn{conn, time.Now()} select { - case c.p <- conn: + case c.p <- tconn: + cleardeadline(conn) // reset any previous timeout + keepalive(conn) return true case <-c.ctx.Done(): // stop return false @@ -214,8 +228,8 @@ func (c *ConnPool[T]) clean() { log.I("pool: %v closed? %t", c.id, ok) for { select { - case conn := <-c.p: - clos(conn) + case tconn := <-c.p: + CloseConn(tconn.c) default: return } @@ -229,18 +243,18 @@ func (c *ConnPool[T]) scrub() { } select { - case conn := <-c.p: - if readable(conn) { + case tconn := <-c.p: + if fresh(tconn.dob) && readable(tconn.c) { select { - case c.p <- conn: + case c.p <- tconn: // update dob only on Put() case <-c.ctx.Done(): // stop - clos(conn) + CloseConn(tconn.c) return default: // full - clos(conn) + CloseConn(tconn.c) } } else { - clos(conn) + CloseConn(tconn.c) } case <-c.ctx.Done(): return @@ -250,6 +264,10 @@ func (c *ConnPool[T]) scrub() { } } +func fresh(t time.Time) bool { + return time.Since(t) < maxttl +} + // github.com/golang/go/issues/15735 func readable(c net.Conn) bool { var err error @@ -264,10 +282,6 @@ func readable(c net.Conn) bool { return err == nil } -func clos(c net.Conn) { - CloseConn(c) -} - // github.com/go-sql-driver/mysql/blob/f20b28636/conncheck.go // github.com/redis/go-redis/blob/cc9bcb0c0/internal/pool/conn_check.go func canread(sc syscall.Conn) error { @@ -317,6 +331,17 @@ func canread(sc syscall.Conn) error { return errors.Join(ctlErr, checkErr) // may return nil } +func keepalive(c net.Conn) bool { + return SetKeepAliveConfigSockOpt(c, kaidle, kainterval) +} + +func nokeepalive(c net.Conn) bool { + if tc, ok := c.(*net.TCPConn); ok { + return tc.SetKeepAlive(false) == nil + } + return false +} + func logev(err error) log.LogFn { return logevif(err != nil) } diff --git a/intra/core/ping.go b/intra/core/ping.go index a66e4136..c16e9921 100644 --- a/intra/core/ping.go +++ b/intra/core/ping.go @@ -180,6 +180,12 @@ func extend(c MinConn) { } } +func cleardeadline(c MinConn) { + if c != nil { + _ = c.SetDeadline(time.Time{}) + } +} + func payload() (t []byte, tslen int, err error) { randomPayload := make([]byte, 16) _, err = rand.Read(randomPayload[:]) diff --git a/intra/protect/sockopt.go b/intra/core/sockopt.go similarity index 57% rename from intra/protect/sockopt.go rename to intra/core/sockopt.go index e418458d..5bd645c1 100644 --- a/intra/protect/sockopt.go +++ b/intra/core/sockopt.go @@ -4,14 +4,13 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -package protect +package core import ( "net" "syscall" "github.com/celzero/firestack/intra/log" - "github.com/celzero/firestack/intra/settings" "golang.org/x/sys/unix" ) @@ -34,59 +33,72 @@ var ( } ) -func SetKeepAliveConfig(c Conn) bool { - if !settings.GetDialerOpts().LowerKeepAlive { - return false - } - +func SetKeepAliveConfig(c MinConn) bool { if tc, ok := c.(*net.TCPConn); ok { return tc.SetKeepAliveConfig(kacfg) == nil } return false } -func SetKeepAliveConfigSockOpt(c Conn) bool { - if !settings.GetDialerOpts().LowerKeepAlive { - return false - } +// SetKeepAliveConfigSockOpt sets for a TCP connection, SO_KEEPALIVE, +// TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT, TCP_USER_TIMEOUT. +// args is optional, and should be in the order of idle, interval, count. +func SetKeepAliveConfigSockOpt(c MinConn, args ...int) (ok bool) { + var tc *net.TCPConn + if tc, ok = c.(*net.TCPConn); ok { + id := conn2str(tc) - if tc, ok := c.(*net.TCPConn); ok { rawConn, err := tc.SyscallConn() if err != nil || rawConn == nil { ok = false return ok } + + idle := defaultIdle // secs + interval := defaultInterval // secs + count := defaultCount + if len(args) >= 1 && args[0] > 0 { + idle = args[0] + } + if len(args) >= 2 && args[1] > 0 { + interval = args[1] + } + if len(args) >= 3 && args[2] > 0 { + count = args[2] + } + usertimeoutms := idle*1000 + (interval * count) // millis + + ok = true err = rawConn.Control(func(fd uintptr) { sock := int(fd) if err := syscall.SetsockoptInt(sock, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(true)); err != nil { - log.D("set SO_KEEPALIVE failed: %v", err) + log.D("set SO_KEEPALIVE %s failed: %v", id, err) ok = false } - if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, defaultIdle); err != nil { - log.D("set TCP_KEEPIDLE failed: %v", err) + if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, idle); err != nil { + log.D("set TCP_KEEPIDLE %s failed: %ds, %v", id, idle, err) ok = false } - if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, defaultInterval); err != nil { - log.D("set TCP_KEEPINTVL failed: %v", err) + if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, interval); err != nil { + log.D("set TCP_KEEPINTVL %s failed: %ds, %v", id, interval, err) ok = false } - if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, defaultCount); err != nil { - log.D("set TCP_KEEPCNT failed: %v", err) + if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, count); err != nil { + log.D("set TCP_KEEPCNT %s failed: #%d, %v", id, count, err) ok = false } // code.googlesource.com/google-api-go-client/+/master/transport/grpc/dial_socketopt.go#30 - if err := unix.SetsockoptInt(sock, unix.SOL_TCP, unix.TCP_USER_TIMEOUT, usrTimeoutMillis); err != nil { - log.D("set TCP_USER_TIMEOUT failed: %v", err) + if err := unix.SetsockoptInt(sock, unix.SOL_TCP, unix.TCP_USER_TIMEOUT, usertimeoutms); err != nil { + log.D("set TCP_USER_TIMEOUT %s failed: %dms, %v", id, usertimeoutms, err) ok = false } }) if err != nil { - log.E("RawConn.Control() failed: %v", err) + log.E("dialers: sockopt: %s RawConn.Control() err: %v", id, err) ok = false } - return ok } - return false + return ok } func boolint(b bool) int { diff --git a/intra/ipn/auto.go b/intra/ipn/auto.go index 1f793fe4..e19fda76 100644 --- a/intra/ipn/auto.go +++ b/intra/ipn/auto.go @@ -9,6 +9,7 @@ package ipn import ( "context" "math/rand" + "net" "net/netip" "time" @@ -16,6 +17,7 @@ import ( "github.com/celzero/firestack/intra/core" "github.com/celzero/firestack/intra/log" "github.com/celzero/firestack/intra/protect" + "github.com/celzero/firestack/intra/settings" ) var ( @@ -114,8 +116,7 @@ func (h *auto) Dial(network, addr string) (protect.Conn, error) { h.exp.K(addr, who, ttl30s) h.status.Store(TOK) } - // adjust TCP keepalive config if c is a TCPConn - protect.SetKeepAliveConfigSockOpt(c) + maybeKeepAlive(c) log.I("proxy: auto: w(%d) pin(%t/%d), dial(%s) %s; err? %v", who, recent, previdx, network, addr, err) return c, err @@ -263,3 +264,10 @@ func ip4to6(prefix96 netip.Prefix, ip4 netip.Addr) netip.Addr { } return netip.AddrFrom16(s6) } + +func maybeKeepAlive(c net.Conn) { + if settings.GetDialerOpts().LowerKeepAlive { + // adjust TCP keepalive config if c is a TCPConn + core.SetKeepAliveConfigSockOpt(c) + } +} diff --git a/intra/ipn/base.go b/intra/ipn/base.go index ab0e2816..ece0ae95 100644 --- a/intra/ipn/base.go +++ b/intra/ipn/base.go @@ -55,9 +55,7 @@ func (h *base) Dial(network, addr string) (c protect.Conn, err error) { } defer localDialStatus(h.status, err) - //Adjust TCP keepalive config if c is a TCPConn - protect.SetKeepAliveConfigSockOpt(c) - + maybeKeepAlive(c) log.I("proxy: base: dial(%s) to %s; err? %v", network, addr, err) return } diff --git a/intra/ipn/exit.go b/intra/ipn/exit.go index 9681949e..42849dfa 100644 --- a/intra/ipn/exit.go +++ b/intra/ipn/exit.go @@ -49,8 +49,7 @@ func (h *exit) Dial(network, addr string) (protect.Conn, error) { // exit always splits c, err := localDialStrat(h.outbound, network, addr) defer localDialStatus(h.status, err) - // adjust TCP keepalive config if c is a TCPConn - protect.SetKeepAliveConfigSockOpt(c) + maybeKeepAlive(c) log.I("proxy: exit: dial(%s) to %s; err? %v", network, addr, err) return c, err }