diff --git a/intra/dialers/split_and_desync.go b/intra/dialers/split_and_desync.go index f2de2609..e57abbbb 100644 --- a/intra/dialers/split_and_desync.go +++ b/intra/dialers/split_and_desync.go @@ -223,17 +223,8 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit for i := 0; i < desync_max_ttl-1 && measureTTL; i += desync_delta_ttl { _, cmsgN, _, from, err := unix.Recvmsg(udpFD, msgBuf[:], cmsgBuf[:], unix.MSG_ERRQUEUE) if err != nil { - log.V("split-desync: recvmsg %v, processed? %t, err: %v", ipp, processed, err) - if processed { // recvmsg processed at least once - break // uc must be nonblocking - } else { - processed = true - i -= desync_delta_ttl - time.Sleep(5 * time.Millisecond) - continue - } - } else { - processed = true + log.V("split-desync: recvmsg %v, err: %v", ipp, err) + break } cmsgs, err := unix.ParseSocketControlMessage(cmsgBuf[:cmsgN]) @@ -250,6 +241,7 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit break } oc.ttl = max(oc.ttl, ttl) + processed = true } } else { if exceedsTTL(cmsgs) { @@ -259,6 +251,7 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit break } oc.ttl = max(oc.ttl, ttl) + processed = true } } } @@ -266,7 +259,7 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit // skip desync if no measurement is done oc.used.Store(!processed) - log.D("split-desync: done: %v, ok? %t, ttl: %d", ipp, processed, oc.ttl) + log.D("split-desync: done: %v, used? %t, ttl: %d", ipp, oc.used.Load(), oc.ttl) return oc, nil } @@ -346,26 +339,29 @@ func (s *overwriteSplitter) Read(b []byte) (int, error) { return s.conn.Read(b) // Write implements DuplexConn. // ref: github.com/hufrea/byedpi/blob/82e5229df00/desync.c#L69-L123 -func (s *overwriteSplitter) Write(b []byte) (int, error) { +func (s *overwriteSplitter) Write(b []byte) (n int, err error) { conn := s.conn - - if s.used.Load() { - // after the first write, there is no special write behavior. - // used may also be set to true to avoid desync. - return conn.Write(b) - } - - // set `used` to ensure this code only runs once per conn. - if !s.used.CompareAndSwap(false, true) { - return conn.Write(b) - } - laddr := laddr(s.conn) raddr := raddr(s.conn) - if len(b) <= len(s.payload) { - log.D("split-desync: write: no desync %s => %s; len(b) <= len(s.payload): %d <= %d", laddr, raddr, len(b), len(s.payload)) - return conn.Write(b) + short := len(b) < len(s.payload) + swapped := false + used := s.used.Load() + if used { + // after the first write, there is no special write behavior. + // used may also be set to true to avoid desync. + n, err = conn.Write(b) + } else if swapped = s.used.CompareAndSwap(false, true); !swapped { + // set `used` to ensure this code only runs once per conn; + // if !swapped, some other goroutine has already swapped it. + n, err = conn.Write(b) + } else if short { + n, err = conn.Write(b) + } + if used || short || !swapped { + logeif(err)("split-desync: write: %s => %s; desync done %d; (used? %t, short? %t, race? %t); err? %v", + laddr, raddr, n, used, short, !swapped, err) + return n, err } rawConn, err := conn.SyscallConn()