Skip to content

Commit

Permalink
ipn/proxies: impl load balancing among chosen proxies
Browse files Browse the repository at this point in the history
all: impl fmt.Stringer where possible
  • Loading branch information
ignoramous committed Oct 24, 2024
1 parent 84a4960 commit c53b4e5
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 108 deletions.
3 changes: 2 additions & 1 deletion intra/backend/dnsx_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type DNSOpts struct {
NOBLOCK bool
}

func (s *DNSSummary) Str() string {
// String implements fmt.Stringer.
func (s *DNSSummary) String() string {
return fmt.Sprintf("type: %s, id: %s, latency: %f, qname: %s, rdata: %s, rcode: %d, rttl: %d, server: %s, relay: %s, status: %d, blocklists: %s, msg: %s, loc: %s",
s.Type, s.ID, s.Latency, s.QName, s.RData, s.RCode, s.RTtl, s.Server, s.RelayServer, s.Status, s.Blocklists, s.Msg, s.Region)
}
Expand Down
75 changes: 61 additions & 14 deletions intra/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type baseHandler struct {

tunMode *settings.TunMode
resolver dnsx.Resolver // dns resolver to forward queries to
prox ipn.Proxies // proxy provider
smmch chan *SocketSummary
listener SocketListener // listener for socket summaries

Expand All @@ -80,12 +81,13 @@ type baseHandler struct {

var _ netstack.GBaseConnHandler = (*baseHandler)(nil)

func newBaseHandler(pctx context.Context, proto string, r dnsx.Resolver, tm *settings.TunMode, l SocketListener) *baseHandler {
func newBaseHandler(pctx context.Context, proto string, r dnsx.Resolver, px ipn.Proxies, tm *settings.TunMode, l SocketListener) *baseHandler {
h := &baseHandler{
ctx: pctx,
proto: proto,
tunMode: tm,
resolver: r,
prox: px,
smmch: make(chan *SocketSummary, smmchSize),
listener: l,
fwtracker: core.NewExpiringMap[string, string](pctx),
Expand Down Expand Up @@ -130,7 +132,7 @@ func (h *baseHandler) onFlow(localaddr, target netip.AddrPort) (fm *Mark, undidA
if blockmode == settings.BlockModeSink {
return // blocks
} else if blockmode == settings.BlockModeNone {
fm = optionsBase
fm = optionsExit
} // else: BlockModeFilter|BlockModeFilterProc

// Implicit: BlockModeFilter or BlockModeFilterProc
Expand Down Expand Up @@ -215,9 +217,9 @@ func (h *baseHandler) onFlow(localaddr, target netip.AddrPort) (fm *Mark, undidA
if fm == nil || !ok { // zeroListener returns nil
log.W("onFlow: %s empty res or on flow timeout %t; block!", h.proto, ok)
fm = optionsBlock
} else if len(fm.PID) <= 0 {
} else if len(fm.PIDCSV) <= 0 {
log.E("onFlow: %s no pid from kt; exit!", h.proto)
fm.PID = ipn.Exit
fm.PIDCSV = ipn.Exit
}

return
Expand Down Expand Up @@ -266,13 +268,13 @@ func (h *baseHandler) queueSummary(s *SocketSummary) {
log.VV("%s: queueSummary: %x %x %s", h.proto, h.smmch, h.ctx, s.ID)
select {
case <-h.ctx.Done():
log.D("%s: queueSummary: end: %s", h.proto, s.str())
log.D("%s: queueSummary: end: %s", h.proto, s)
default:
select {
case <-h.ctx.Done():
case h.smmch <- s:
default:
log.W("%s: sendSummary: dropped: %s", h.proto, s.str())
log.W("%s: sendSummary: dropped: %s", h.proto, s)
}
}
}
Expand Down Expand Up @@ -303,7 +305,7 @@ func (h *baseHandler) sendSummary(s *SocketSummary, after time.Duration) {
time.Sleep(after)
}

log.VV("%s: end? sendNotif: %s", h.proto, s.str())
log.VV("%s: end? sendNotif: %s", h.proto, s)
h.listener.OnSocketClosed(s) // s.Duration may be uninitialized (zero)
}

Expand All @@ -324,9 +326,22 @@ func (h *baseHandler) CloseConns(cids []string) (closed []string) {
return closed
}

// TODO: move this to ipn.Block
func (h *baseHandler) stall(k string) (secs uint32) {
if n := h.fwtracker.Get(k); n <= 0 {
// aux is usually dst domains, ip, ip:port
func (h *baseHandler) flowID(uid string, aux ...string) (fid string) {
if len(uid) <= 0 { // uid may be empty
uid = UNKNOWN_UID_STR
} // or: uid may be unknown
fid = uid
for _, v := range aux {
if len(v) > 0 { // choose the first non-empty aux
return fid + v
}
}
return
}

func (h *baseHandler) stall(flowid string) (secs uint32) {
if n := h.fwtracker.Get(flowid); n <= 0 {
secs = 0 // no stall
} else if n > 30 {
secs = 30 // max up to 30s
Expand All @@ -338,7 +353,7 @@ func (h *baseHandler) stall(k string) (secs uint32) {
// track uid->target for n secs, or 30s if n is 0
life30s := ((29 + secs) % 30) + 1
newlife := time.Duration(life30s) * time.Second
h.fwtracker.Set(k, newlife)
h.fwtracker.Set(flowid, newlife)
if secs > 0 {
w := time.Duration(secs) * time.Second
time.Sleep(w)
Expand Down Expand Up @@ -521,12 +536,40 @@ func filterFamilyForDialing(ipcsv string) string {
return strings.Join(filtered, ",")
}

// returns proxy-id, conn-id, user-id
func splitCidPidUid(decision *Mark) (cid, pid, uid string) {
// returns conn-id, proxy-id, user-id, flow-id.
// conn-id may be empty.
func (h *baseHandler) judge(decision *Mark, aux ...string) (cid, pid, uid, fid string) {
if decision == nil {
return
}
return decision.CID, decision.PID, decision.UID

if len(decision.UID) > 0 {
uid = decision.UID
} else {
uid = UNKNOWN_UID_STR
}
cid = decision.CID
fid = h.flowID(uid, aux...)
pid = h.selectPid(decision.PIDCSV, fid)
return
}

func (h *baseHandler) selectPid(pidcsv, fid string) string {
if len(pidcsv) <= 0 {
return ipn.Block
}
all := strings.Split(pidcsv, ",")
if firstEmpty(all) {
return ipn.Block
}

pid, err := h.prox.PinOne(fid, all)
logev(err)("intra: selectPid: %s for %s; err? %v", pid, fid, err)
if err != nil {
return ipn.Block
} else {
return pid
}
}

func conn2str(a net.Conn, b net.Conn) string {
Expand Down Expand Up @@ -554,3 +597,7 @@ func ntoa(n string) int32 {
}
return UNSUPPORTED_NETWORK
}

func firstEmpty(arr []string) bool {
return len(arr) <= 0 || len(arr[0]) <= 0
}
9 changes: 4 additions & 5 deletions intra/core/expiringsieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ type Sieve[K comparable, V any] struct {
}

// NewSieve returns a new Sieve with keys expiring after lifetime.
func NewSieve[K comparable, V any](lifetime time.Duration) *Sieve[K, V] {
func NewSieve[K comparable, V any](ctx context.Context, dur time.Duration) *Sieve[K, V] {
return &Sieve[K, V]{
// TODO: with context.TODO, expmap's reaper goroutine will leak.
c: NewExpiringMapLifetime[K, V](context.TODO(), lifetime),
c: NewExpiringMapLifetime[K, V](ctx, dur),
}
}

Expand All @@ -46,6 +45,6 @@ func (s *Sieve[K, V]) Len() int {
}

// Clear removes all elements from the sieve.
func (s *Sieve[K, V]) Clear() {
s.c.Clear()
func (s *Sieve[K, V]) Clear() int {
return s.c.Clear()
}
6 changes: 5 additions & 1 deletion intra/dialers/split_and_desync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package dialers

import (
"context"
secrand "crypto/rand"
"io"
"math/rand"
Expand All @@ -32,11 +33,14 @@ const (
desync_noop_ttl = 3
desync_delta_ttl = 1
desync_invalid_ttl = -1

desync_cache_ttl = 30 * time.Second
)

// ttlcache stores the TTL for a given IP address for a limited time.
// TODO: invalidate cache on network changes.
var ttlcache = core.NewSieve[netip.Addr, int](30 * time.Second)
// TODO: with context.TODO, expmap's reaper goroutine will leak.
var ttlcache = core.NewSieve[netip.Addr, int](context.TODO(), desync_cache_ttl)

// Combines direct split with TCB Desynchronization Attack
// Inspired by byedpi: github.com/hufrea/byedpi/blob/82e5229df00/desync.c#L69-L123
Expand Down
10 changes: 6 additions & 4 deletions intra/dns53/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (t *dnssd) Query(_ string, q *dns.Msg, smm *x.DNSSummary) (ans *dns.Msg, er
smm.Server = t.GetAddr()

defer func() {
log.D("mdns: err: %v; summary: %s", err, smm.Str())
log.D("mdns: err: %v; summary: %s", err, smm)
}()

start := time.Now()
Expand Down Expand Up @@ -253,8 +253,10 @@ type client struct {
closed atomic.Bool // 0: open, 1: closed
}

func (c *client) str() string {
return fmt.Sprintf("use4/6? %t/%t; oneshot? %t; tracked %d; closed %t", c.use4, c.use6, c.oneshot, len(c.tracker), c.closed.Load())
// String implements fmt.Stringer
func (c *client) String() string {
return fmt.Sprintf("use4/6? %t/%t; oneshot? %t; tracked %d; closed %t",
c.use4, c.use6, c.oneshot, len(c.tracker), c.closed.Load())
}

// newClient creates a new mdns unicast and multicast client
Expand Down Expand Up @@ -323,7 +325,7 @@ func (c *client) Close() error {
}
c.once.Do(func() {
c.closed.Store(true)
log.I("mdns: closing client %v", c.str())
log.I("mdns: closing client %s", c)

core.CloseUDP(c.unicast4)
core.CloseUDP(c.unicast6)
Expand Down
2 changes: 1 addition & 1 deletion intra/dnscrypt/servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestOne(t *testing.T) {
// FIXME: querying always fails with EOF
ans, err := tr.Query(netw, q, smm)
if err != nil {
log.Output(2, smm.Str())
log.Output(2, smm.String())
t.Fatal(err)
}
if xdns.Len(ans) == 0 {
Expand Down
21 changes: 10 additions & 11 deletions intra/dnsx/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package dnsx
import (
"context"
"errors"
"fmt"
"hash/fnv"
"math/rand"
"strconv"
Expand Down Expand Up @@ -122,7 +123,7 @@ func NewCachingTransport(t Transport, ttl time.Duration) Transport {
reqbarrier: core.NewBarrier[*cres](battl),
hangover: core.NewHangover(),
}
log.I("cache: (%s) setup: %s; opts: %s", ct.ID(), ct.GetAddr(), ct.str())
log.I("cache: (%s) setup: %s; opts: %s", ct.ID(), ct.GetAddr(), ct)
return ct
}

Expand All @@ -139,16 +140,14 @@ func (c *cres) copy() *cres {
}
}

// String implements fmt.Stringer
func (cr *cres) String() string {
return cr.str()
return fmt.Sprintf("bumps=%d; expiry=%s; s=%s", cr.bumps, cr.expiry, cr.s)
}

func (cr *cres) str() string {
return "bumps=" + strconv.Itoa(cr.bumps) + "; expiry=" + cr.expiry.String() + "; s=" + cr.s.Str()
}

func (t *ctransport) str() string {
return "ttl=" + t.ttl.String() + ";bumps=" + strconv.Itoa(t.bumps) + ";size=" + strconv.Itoa(t.size)
// String implements fmt.Stringer
func (t *ctransport) String() string {
return fmt.Sprintf("ttl=%s; bumps=%d; size=%d", t.ttl, t.bumps, t.size)
}

func hash(s string) uint8 {
Expand Down Expand Up @@ -269,7 +268,7 @@ func (cb *cache) put(key string, ans *dns.Msg, s *x.DNSSummary) (ok bool) {
}
cb.c[key] = v

log.D("cache: put(%s): l(%t/%d); %s", key, xdns.HasAnyAnswer(ans), xdns.Len(ans), v.str())
log.D("cache: put(%s): l(%t/%d); %s", key, xdns.HasAnyAnswer(ans), xdns.Len(ans), v)

ok = true
return
Expand Down Expand Up @@ -388,10 +387,10 @@ func (t *ctransport) fetch(network string, q *dns.Msg, summary *x.DNSSummary, cb
var cachedsummary *x.DNSSummary
hasans := v.ans != nil

log.D("cache: hit(k: %s / stale? %t / ans? %t): %s", key, !isfresh, hasans, v.str())
log.D("cache: hit(k: %s / stale? %t / ans? %t): %s", key, !isfresh, hasans, v)
r, cachedsummary, err := asResponse(q, v, isfresh) // return cached response, may be stale
if err != nil {
log.W("cache: hit(k: %s) %s, but err? %v", key, v.str(), err)
log.W("cache: hit(k: %s) %s, but err? %v", key, v, err)
if err == errCacheResponseMismatch {
// FIXME: this is a hack to fix the issue where the cache
// returns a response that does not match the query.
Expand Down
4 changes: 2 additions & 2 deletions intra/dnsx/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ func (r *resolver) queueSummary(smm *x.DNSSummary) {
}
select {
case <-r.ctx.Done():
log.W("dns: fwd: smms closed; dropping %s", smm.Str())
log.W("dns: fwd: smms closed; dropping %s", smm)
default:
select {
case <-r.ctx.Done():
case r.smms <- smm:
default:
log.W("dns: fwd: smms full; dropping %s", smm.Str())
log.W("dns: fwd: smms full; dropping %s", smm)
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions intra/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

type icmpHandler struct {
*baseHandler
prox ipn.Proxies
}

const (
Expand All @@ -37,8 +36,7 @@ var _ netstack.GICMPHandler = (*icmpHandler)(nil)

func NewICMPHandler(pctx context.Context, resolver dnsx.Resolver, prox ipn.Proxies, tunMode *settings.TunMode, listener Listener) netstack.GICMPHandler {
h := &icmpHandler{
baseHandler: newBaseHandler(pctx, "icmp", resolver, tunMode, listener),
prox: prox,
baseHandler: newBaseHandler(pctx, "icmp", resolver, prox, tunMode, listener),
}

go h.processSummaries()
Expand Down Expand Up @@ -69,7 +67,7 @@ func (h *icmpHandler) Ping(msg []byte, source, target netip.AddrPort) (echoed bo
res, undidAlg, realips, doms := h.flow(source, target)
dst := oneRealIPPort(realips, target)
// on Android, uid is always "unknown" for icmp
cid, pid, uid := splitCidPidUid(res)
cid, pid, uid, _ := h.judge(res, doms, target.String())
smm := icmpSummary(cid, pid, uid)

defer func() {
Expand Down
Loading

1 comment on commit c53b4e5

@ignoramous
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#81

Please sign in to comment.