Skip to content

Commit

Permalink
core/connpool: tcp keepalive & max ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
ignoramous committed Oct 20, 2024
1 parent 6aaafd7 commit 91437d1
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 52 deletions.
67 changes: 46 additions & 21 deletions intra/core/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ const poolcapacity = 8 // default capacity
const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool
const Nobody = uintptr(0) // nobody
const scrubinterval = 5 * time.Minute // interval between subsequent scrubs
const maxttl = 8 * time.Minute // close unused pooled conns after this period

// go.dev/play/p/ig2Zpk-LTSv
var (
kaidle = int(maxttl / 5 / time.Second) // 8m / 5 => 96s
kainterval = int(maxttl / 10 / time.Second) // 8m / 10 => 48s
)
var errUnexpectedRead error = errors.New("pool: unexpected read")

type superpool[T comparable] struct {
Expand Down Expand Up @@ -124,19 +130,24 @@ func (m *MultConnPool[T]) Put(id T, conn net.Conn) bool {
return super.pool.Put(conn)
}

type timedconn struct {
c net.Conn
dob time.Time
}

// github.com/redis/go-redis/blob/d9eeed13/internal/pool/pool.go
type ConnPool[T comparable] struct {
ctx context.Context
id T
p chan net.Conn // never closed
p chan timedconn // never closed
closed atomic.Bool
}

func NewConnPool[T comparable](ctx context.Context, id T) *ConnPool[T] {
c := &ConnPool[T]{
ctx: ctx,
id: id,
p: make(chan net.Conn, poolcapacity),
p: make(chan timedconn, poolcapacity),
}

context.AfterFunc(ctx, c.clean)
Expand All @@ -157,13 +168,13 @@ func (c *ConnPool[T]) Get() (zz net.Conn) {
for i < maxattempts {
i++
select {
case conn := <-c.p:
if readable(conn) {
// reset previous timeout
_ = conn.SetDeadline(time.Time{})
return conn
case tconn := <-c.p:
// if readable, return conn regardless of its freshness
if readable(tconn.c) {
nokeepalive(tconn.c)
return tconn.c
}
clos(conn)
CloseConn(tconn.c)
case <-ctx.Done():
return // signal stop
default:
Expand All @@ -189,8 +200,11 @@ func (c *ConnPool[T]) Put(conn net.Conn) (ok bool) {
return
}

tconn := timedconn{conn, time.Now()}
select {
case c.p <- conn:
case c.p <- tconn:
cleardeadline(conn) // reset any previous timeout
keepalive(conn)
return true
case <-c.ctx.Done(): // stop
return false
Expand All @@ -214,8 +228,8 @@ func (c *ConnPool[T]) clean() {
log.I("pool: %v closed? %t", c.id, ok)
for {
select {
case conn := <-c.p:
clos(conn)
case tconn := <-c.p:
CloseConn(tconn.c)
default:
return
}
Expand All @@ -229,18 +243,18 @@ func (c *ConnPool[T]) scrub() {
}

select {
case conn := <-c.p:
if readable(conn) {
case tconn := <-c.p:
if fresh(tconn.dob) && readable(tconn.c) {
select {
case c.p <- conn:
case c.p <- tconn: // update dob only on Put()
case <-c.ctx.Done(): // stop
clos(conn)
CloseConn(tconn.c)
return
default: // full
clos(conn)
CloseConn(tconn.c)
}
} else {
clos(conn)
CloseConn(tconn.c)
}
case <-c.ctx.Done():
return
Expand All @@ -250,6 +264,10 @@ func (c *ConnPool[T]) scrub() {
}
}

func fresh(t time.Time) bool {
return time.Since(t) < maxttl
}

// github.com/golang/go/issues/15735
func readable(c net.Conn) bool {
var err error
Expand All @@ -264,10 +282,6 @@ func readable(c net.Conn) bool {
return err == nil
}

func clos(c net.Conn) {
CloseConn(c)
}

// github.com/go-sql-driver/mysql/blob/f20b28636/conncheck.go
// github.com/redis/go-redis/blob/cc9bcb0c0/internal/pool/conn_check.go
func canread(sc syscall.Conn) error {
Expand Down Expand Up @@ -317,6 +331,17 @@ func canread(sc syscall.Conn) error {
return errors.Join(ctlErr, checkErr) // may return nil
}

func keepalive(c net.Conn) bool {
return SetKeepAliveConfigSockOpt(c, kaidle, kainterval)
}

func nokeepalive(c net.Conn) bool {
if tc, ok := c.(*net.TCPConn); ok {
return tc.SetKeepAlive(false) == nil
}
return false
}

func logev(err error) log.LogFn {
return logevif(err != nil)
}
Expand Down
6 changes: 6 additions & 0 deletions intra/core/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ func extend(c MinConn) {
}
}

func cleardeadline(c MinConn) {
if c != nil {
_ = c.SetDeadline(time.Time{})
}
}

func payload() (t []byte, tslen int, err error) {
randomPayload := make([]byte, 16)
_, err = rand.Read(randomPayload[:])
Expand Down
60 changes: 36 additions & 24 deletions intra/protect/sockopt.go → intra/core/sockopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package protect
package core

import (
"net"
"syscall"

"github.com/celzero/firestack/intra/log"
"github.com/celzero/firestack/intra/settings"
"golang.org/x/sys/unix"
)

Expand All @@ -34,59 +33,72 @@ var (
}
)

func SetKeepAliveConfig(c Conn) bool {
if !settings.GetDialerOpts().LowerKeepAlive {
return false
}

func SetKeepAliveConfig(c MinConn) bool {
if tc, ok := c.(*net.TCPConn); ok {
return tc.SetKeepAliveConfig(kacfg) == nil
}
return false
}

func SetKeepAliveConfigSockOpt(c Conn) bool {
if !settings.GetDialerOpts().LowerKeepAlive {
return false
}
// SetKeepAliveConfigSockOpt sets for a TCP connection, SO_KEEPALIVE,
// TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT, TCP_USER_TIMEOUT.
// args is optional, and should be in the order of idle, interval, count.
func SetKeepAliveConfigSockOpt(c MinConn, args ...int) (ok bool) {
var tc *net.TCPConn
if tc, ok = c.(*net.TCPConn); ok {
id := conn2str(tc)

if tc, ok := c.(*net.TCPConn); ok {
rawConn, err := tc.SyscallConn()
if err != nil || rawConn == nil {
ok = false
return ok
}

idle := defaultIdle // secs
interval := defaultInterval // secs
count := defaultCount
if len(args) >= 1 && args[0] > 0 {
idle = args[0]
}
if len(args) >= 2 && args[1] > 0 {
interval = args[1]
}
if len(args) >= 3 && args[2] > 0 {
count = args[2]
}
usertimeoutms := idle*1000 + (interval * count) // millis

ok = true
err = rawConn.Control(func(fd uintptr) {
sock := int(fd)
if err := syscall.SetsockoptInt(sock, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(true)); err != nil {
log.D("set SO_KEEPALIVE failed: %v", err)
log.D("set SO_KEEPALIVE %s failed: %v", id, err)
ok = false
}
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, defaultIdle); err != nil {
log.D("set TCP_KEEPIDLE failed: %v", err)
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, idle); err != nil {
log.D("set TCP_KEEPIDLE %s failed: %ds, %v", id, idle, err)
ok = false
}
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, defaultInterval); err != nil {
log.D("set TCP_KEEPINTVL failed: %v", err)
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, interval); err != nil {
log.D("set TCP_KEEPINTVL %s failed: %ds, %v", id, interval, err)
ok = false
}
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, defaultCount); err != nil {
log.D("set TCP_KEEPCNT failed: %v", err)
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, count); err != nil {
log.D("set TCP_KEEPCNT %s failed: #%d, %v", id, count, err)
ok = false
}
// code.googlesource.com/google-api-go-client/+/master/transport/grpc/dial_socketopt.go#30
if err := unix.SetsockoptInt(sock, unix.SOL_TCP, unix.TCP_USER_TIMEOUT, usrTimeoutMillis); err != nil {
log.D("set TCP_USER_TIMEOUT failed: %v", err)
if err := unix.SetsockoptInt(sock, unix.SOL_TCP, unix.TCP_USER_TIMEOUT, usertimeoutms); err != nil {
log.D("set TCP_USER_TIMEOUT %s failed: %dms, %v", id, usertimeoutms, err)
ok = false
}
})
if err != nil {
log.E("RawConn.Control() failed: %v", err)
log.E("dialers: sockopt: %s RawConn.Control() err: %v", id, err)
ok = false
}
return ok
}
return false
return ok
}

func boolint(b bool) int {
Expand Down
12 changes: 10 additions & 2 deletions intra/ipn/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ package ipn
import (
"context"
"math/rand"
"net"
"net/netip"
"time"

x "github.com/celzero/firestack/intra/backend"
"github.com/celzero/firestack/intra/core"
"github.com/celzero/firestack/intra/log"
"github.com/celzero/firestack/intra/protect"
"github.com/celzero/firestack/intra/settings"
)

var (
Expand Down Expand Up @@ -114,8 +116,7 @@ func (h *auto) Dial(network, addr string) (protect.Conn, error) {
h.exp.K(addr, who, ttl30s)
h.status.Store(TOK)
}
// adjust TCP keepalive config if c is a TCPConn
protect.SetKeepAliveConfigSockOpt(c)
maybeKeepAlive(c)
log.I("proxy: auto: w(%d) pin(%t/%d), dial(%s) %s; err? %v",
who, recent, previdx, network, addr, err)
return c, err
Expand Down Expand Up @@ -263,3 +264,10 @@ func ip4to6(prefix96 netip.Prefix, ip4 netip.Addr) netip.Addr {
}
return netip.AddrFrom16(s6)
}

func maybeKeepAlive(c net.Conn) {
if settings.GetDialerOpts().LowerKeepAlive {
// adjust TCP keepalive config if c is a TCPConn
core.SetKeepAliveConfigSockOpt(c)
}
}
4 changes: 1 addition & 3 deletions intra/ipn/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ func (h *base) Dial(network, addr string) (c protect.Conn, err error) {
}
defer localDialStatus(h.status, err)

//Adjust TCP keepalive config if c is a TCPConn
protect.SetKeepAliveConfigSockOpt(c)

maybeKeepAlive(c)
log.I("proxy: base: dial(%s) to %s; err? %v", network, addr, err)
return
}
Expand Down
3 changes: 1 addition & 2 deletions intra/ipn/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func (h *exit) Dial(network, addr string) (protect.Conn, error) {
// exit always splits
c, err := localDialStrat(h.outbound, network, addr)
defer localDialStatus(h.status, err)
// adjust TCP keepalive config if c is a TCPConn
protect.SetKeepAliveConfigSockOpt(c)
maybeKeepAlive(c)
log.I("proxy: exit: dial(%s) to %s; err? %v", network, addr, err)
return c, err
}
Expand Down

1 comment on commit 91437d1

@ignoramous
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.