Skip to content

Commit

Permalink
dialers: retrier for desync strat
Browse files Browse the repository at this point in the history
  • Loading branch information
ignoramous committed Aug 13, 2024
1 parent bd5f87c commit 33edc56
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 43 deletions.
27 changes: 17 additions & 10 deletions intra/dialers/direct_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ package dialers
import (
"io"
"net"
"sync/atomic"

"github.com/celzero/firestack/intra/core"
"github.com/celzero/firestack/intra/protect"
"github.com/celzero/firestack/intra/settings"
)

type splitter struct {
*net.TCPConn
used bool // Initially false. Becomes true after the first write.
strat int32
used atomic.Bool // Initially false. Becomes true after the first write.
}

var _ core.DuplexConn = (*splitter)(nil)
Expand All @@ -40,25 +43,29 @@ func DialWithSplit(d *protect.RDial, addr *net.TCPAddr) (*splitter, error) {
if conn == nil {
return nil, net.UnknownNetworkError("no conn")
}
return &splitter{TCPConn: conn}, nil
strat := settings.DialStrategy.Load()
return &splitter{TCPConn: conn, strat: strat}, nil
}

// Write-related functions
func (s *splitter) Write(b []byte) (n int, err error) {
conn := s.TCPConn
if s.used {
// After the first write, there is no special write behavior.
strat := s.strat
if s.used.Load() {
// after the first write, there is no special write behavior.
return conn.Write(b)
} else if ok := s.used.CompareAndSwap(false, true); ok {
// setting `used` to true ensures that this code only runs once per socket.
n, _, err = writeSplit(strat, conn, b)
return n, err
} else {
// if `used` is already swapped, then the split has already been done.
return conn.Write(b)
}

// Setting `used` to true ensures that this code only runs once per socket.
s.used = true
n, _, err = writeSplit(conn, b)
return n, err
}

func (s *splitter) ReadFrom(reader io.Reader) (bytes int64, err error) {
if !s.used {
if !s.used.Load() {
// This is the first write on this socket.
// Use copyOnce(), which calls Write(), to get Write's splitting behavior for
// the first segment.
Expand Down
55 changes: 40 additions & 15 deletions intra/dialers/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ func (zeroNetAddr) String() string { return "none" }
// be typecastable to *net.TCPConn (see: xdial.DialTCP)
// inheritance: go.dev/play/p/mMiQgXsPM7Y
type retrier struct {
dial *protect.RDial
raddr *net.TCPAddr
dial *protect.RDial
dialStrat int32
raddr *net.TCPAddr

// Flags indicating whether the caller has called CloseRead and CloseWrite.
readDone atomic.Bool
Expand All @@ -65,8 +66,8 @@ type retrier struct {
// the current underlying connection. It is only modified by the reader
// thread, so the reader functions may access it without acquiring a lock.
// nb: if embedding TCPConn; override its WriteTo instead of just ReadFrom
// as io.Copy prefers WriteTo over ReadFrom
conn *net.TCPConn
// as io.Copy prefers WriteTo over ReadFrom; or use core.Pipe
conn core.DuplexConn

// External read and write deadlines. These need to be stored here so that
// they can be re-applied in the event of a retry.
Expand Down Expand Up @@ -124,9 +125,9 @@ func calcTimeout(before, after time.Time) time.Duration {
// Read and CloseRead, and another calling Write, ReadFrom, and CloseWrite.
// `dialer` will be used to establish the connection.
// `addr` is the destination.
func DialWithSplitRetry(dial *protect.RDial, addr *net.TCPAddr) (*retrier, error) {
func DialWithSplitRetry(d *protect.RDial, addr *net.TCPAddr) (*retrier, error) {
before := time.Now()
conn, err := dial.DialTCP(addr.Network(), nil, addr)
conn, err := d.DialTCP(addr.Network(), nil, addr)
if err != nil {
log.E("rdial: tcp addr %s: err %v", addr, err)
return nil, err
Expand All @@ -135,7 +136,8 @@ func DialWithSplitRetry(dial *protect.RDial, addr *net.TCPAddr) (*retrier, error

r := &retrier{
conn: conn,
dial: dial,
dial: d,
dialStrat: settings.DialStrategy.Load(),
raddr: addr,
timeout: calcTimeout(before, after),
retryDoneCh: make(chan struct{}),
Expand All @@ -150,15 +152,26 @@ func DialWithSplitRetry(dial *protect.RDial, addr *net.TCPAddr) (*retrier, error
// split TLS client hello messages could not be written.
func (r *retrier) retryWriteReadLocked(buf []byte) (n int, err error) {
clos(r.conn) // close provisional socket
var newConn *net.TCPConn
if newConn, err = r.dial.DialTCP(r.raddr.Network(), nil, r.raddr); err != nil {
log.E("rdial: retryLocked: dial %s: err %v", r.raddr, err)
return
var newConn core.DuplexConn

switch r.dialStrat {
case settings.DesyncStrategy:
if newConn, err = dialWithSplitAndDesync(r.dial, r.raddr.AddrPort()); err != nil {
log.E("rdial: retryLocked: dialDesync %s: err %v", r.raddr, err)
return
}
case settings.SplitTCPStrategy, settings.SplitTCPOrTLSStrategy:
fallthrough
default:
if newConn, err = r.dial.DialTCP(r.raddr.Network(), nil, r.raddr); err != nil {
log.E("rdial: retryLocked: dialTCP %s: err %v", r.raddr, err)
return
}
}

r.conn = newConn
n, split, err := writeSplit(r.conn, r.hello)
logeif(err)("rdial: retryLocked: %s->%s; split? %d; write? %d/%d; err? %v", laddr(r.conn), r.raddr, split, n, len(r.hello), err)
n, split, err := r.writeSplitLocked()
logeif(err)("rdial: retryLocked: strat(%d) %s->%s; split? %d; write? %d/%d; err? %v", r.dialStrat, laddr(r.conn), r.raddr, split, n, len(r.hello), err)
if err != nil {
return
}
Expand Down Expand Up @@ -448,18 +461,30 @@ func getTLSClientHelloRecordLen(h []byte) (uint16, bool) {
return binary.BigEndian.Uint16(h[3:5]), true
}

func writeSplit(w net.Conn, b []byte) (n, splitLen int, err error) {
switch settings.DialStrategy.Load() {
func (r *retrier) writeSplitLocked() (n, splitLen int, err error) {
return writeSplit(r.dialStrat, r.conn, r.hello)
}

func writeSplit(strat int32, w net.Conn, b []byte) (n, splitLen int, err error) {
switch strat {
case settings.SplitTCPStrategy:
n, splitLen, err = writeTCPSplit(w, b)
case settings.SplitTCPOrTLSStrategy:
n, splitLen, err = writeTCPOrTLSSplit(w, b)
case settings.DesyncStrategy:
n, err = writeDesync(w, b)
// desync does not always split
splitLen = len(desync_http1_1str)
default:
n, err = w.Write(b)
}
return
}

func writeDesync(w io.Writer, b []byte) (n int, err error) {
return w.Write(b)
}

func writeTCPSplit(w net.Conn, hello []byte) (n, splitLen int, err error) {
var p, q int
to := raddr(w)
Expand Down
24 changes: 13 additions & 11 deletions intra/dialers/split_and_desync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
)

const (
probeSize = 8
http1_1str = "POST / HTTP/1.1\r\nHost: 10.0.0.1\r\nContent-Type: application/octet-stream\r\nContent-Length: 9999999\r\n\r\n"

// relaxed is a flag to enable relaxed mode, which lets connections go through without desync.
relaxed = true

probeSize = 8
default_ttl = 64

// desync_relaxed enables relaxed mode, which lets connections go through without desync.
desync_relaxed = true
desync_http1_1str = "POST / HTTP/1.1\r\nHost: 10.0.0.1\r\nContent-Type: application/octet-stream\r\nContent-Length: 9999999\r\n\r\n"
// from: github.com/bol-van/zapret/blob/c369f11638/nfq/darkmagic.h#L214-L216
desync_max_ttl = 20
desync_noop_ttl = 3
Expand Down Expand Up @@ -188,7 +187,7 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit
logeif(err)("split-desync: dialUDP %v %d: err? %v", ipp, udpFD, err)
if err != nil {
measureTTL = false
if !relaxed {
if !desync_relaxed {
return nil, err
}
}
Expand Down Expand Up @@ -221,7 +220,7 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit
oc := &overwriteSplitter{
conn: tcpConn,
ttl: desync_noop_ttl,
payload: []byte(http1_1str),
payload: []byte(desync_http1_1str),
ip6: isIPv6,
}

Expand Down Expand Up @@ -285,19 +284,19 @@ func desyncWithFixedTtl(d *protect.RDial, ipp netip.AddrPort, initialTTL int) (*
s := &overwriteSplitter{
conn: tcpConn,
ttl: initialTTL,
payload: []byte(http1_1str),
payload: []byte(desync_http1_1str),
ip6: ipp.Addr().Is6(),
}
// skip desync if no measurement is done
s.used.Store(initialTTL == desync_invalid_ttl)
s.used.Store(s.ttl == desync_invalid_ttl)
return s, nil
}

// DialWithSplitAndDesync estimates the TTL with UDP traceroute,
// then returns a TCP connection that may launch TCB Desynchronization
// and split the initial upstream segment.
// ref: github.com/bol-van/zapret/blob/c369f11638/docs/readme.eng.md#dpi-desync-attack
func DialWithSplitAndDesync(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplitter, error) {
func dialWithSplitAndDesync(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplitter, error) {
ttl, ok := ttlcache.Get(ipp.Addr())
if ok {
return desyncWithFixedTtl(d, ipp, ttl)
Expand Down Expand Up @@ -434,7 +433,10 @@ func (s *overwriteSplitter) Write(b []byte) (n int, err error) {
return n1, err
}

// restore the first-half of the payload so that it gets picked up on retranmission.
copy(firstSegment, b[:len(s.payload)])

// restore to default TTL
if s.ip6 {
err = unix.SetsockoptInt(sockFD, unix.IPPROTO_IPV6, unix.IPV6_UNICAST_HOPS, default_ttl)
} else {
Expand Down
3 changes: 2 additions & 1 deletion intra/dialers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func adaptp(f mkpconn) mkrconn[*net.PacketConn] {
}

func unPtr[P any, Q any](p *P, q Q) (P, Q) {
if core.IsNil(p) {
// go.dev/play/p/XRrCepATeIi
if p == nil || core.IsNil(p) {
var zz P
return zz, q
}
Expand Down
7 changes: 1 addition & 6 deletions intra/ipn/proxies.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,5 @@ func idling(t time.Time) bool {
}

func localDialStrat(d *protect.RDial, network, addr string) (protect.Conn, error) {
switch settings.DialStrategy.Load() {
case settings.DesyncStrategy:
return dialers.DesyncDial(d, network, addr)
default:
return dialers.SplitDial(d, network, addr)
}
return dialers.SplitDial(d, network, addr)
}

6 comments on commit 33edc56

@ignoramous
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#81

@Lanius-collaris
Copy link
Contributor

@Lanius-collaris Lanius-collaris commented on 33edc56 Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reserve as least one strategy that use TLS record fragmentation first? I really need such strategy to prevent residual censorship, that's why I'm against retrier.

@Lanius-collaris
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove my username from contributors_list?

@ignoramous
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contributors_list

Sure. From GitHub? Or, the About page in the app?

Why not reserve as least one strategy that use TLS record fragmentation first?

Such strategy never did exist for TLS (it existed for TCP, but never used). Will consider adding it for Desync and TLS, too.

need such strategy to prevent residual censorship

Does "residual censorship" here mean the censor flags source IPs for deeper inspection when they detect at least one attempted access to a censored service?

@Lanius-collaris
Copy link
Contributor

@Lanius-collaris Lanius-collaris commented on 33edc56 Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. From GitHub? Or, the About page in the app?

From the About page in the app. Thanks

Does "residual censorship" here mean the censor flags source IPs for deeper inspection when they detect at least one attempted access to a censored service?

https://datatracker.ietf.org/doc/html/rfc9505/#name-residual-censorship

@ignoramous
Copy link
Contributor Author

@ignoramous ignoramous commented on 33edc56 Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reserve as least one strategy that use TLS record fragmentation first? I really need such strategy to prevent residual censorship, that's why I'm against retrier.

5705e08

From the About page in the app. Thanks

Done.

Please sign in to comment.