Skip to content

Commit

Permalink
dialers/desync: retry on recvmsg errs do not work
Browse files Browse the repository at this point in the history
  • Loading branch information
ignoramous committed Aug 12, 2024
1 parent 71865a6 commit 9a33c57
Showing 1 changed file with 24 additions and 28 deletions.
52 changes: 24 additions & 28 deletions intra/dialers/split_and_desync.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,8 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit
for i := 0; i < desync_max_ttl-1 && measureTTL; i += desync_delta_ttl {
_, cmsgN, _, from, err := unix.Recvmsg(udpFD, msgBuf[:], cmsgBuf[:], unix.MSG_ERRQUEUE)
if err != nil {
log.V("split-desync: recvmsg %v, processed? %t, err: %v", ipp, processed, err)
if processed { // recvmsg processed at least once
break // uc must be nonblocking
} else {
processed = true
i -= desync_delta_ttl
time.Sleep(5 * time.Millisecond)
continue
}
} else {
processed = true
log.V("split-desync: recvmsg %v, err: %v", ipp, err)
break
}

cmsgs, err := unix.ParseSocketControlMessage(cmsgBuf[:cmsgN])
Expand All @@ -250,6 +241,7 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit
break
}
oc.ttl = max(oc.ttl, ttl)
processed = true
}
} else {
if exceedsTTL(cmsgs) {
Expand All @@ -259,14 +251,15 @@ func desyncWithTraceroute(d *protect.RDial, ipp netip.AddrPort) (*overwriteSplit
break
}
oc.ttl = max(oc.ttl, ttl)
processed = true
}
}
}

// skip desync if no measurement is done
oc.used.Store(!processed)

log.D("split-desync: done: %v, ok? %t, ttl: %d", ipp, processed, oc.ttl)
log.D("split-desync: done: %v, used? %t, ttl: %d", ipp, oc.used.Load(), oc.ttl)

return oc, nil
}
Expand Down Expand Up @@ -346,26 +339,29 @@ func (s *overwriteSplitter) Read(b []byte) (int, error) { return s.conn.Read(b)

// Write implements DuplexConn.
// ref: github.com/hufrea/byedpi/blob/82e5229df00/desync.c#L69-L123
func (s *overwriteSplitter) Write(b []byte) (int, error) {
func (s *overwriteSplitter) Write(b []byte) (n int, err error) {
conn := s.conn

if s.used.Load() {
// after the first write, there is no special write behavior.
// used may also be set to true to avoid desync.
return conn.Write(b)
}

// set `used` to ensure this code only runs once per conn.
if !s.used.CompareAndSwap(false, true) {
return conn.Write(b)
}

laddr := laddr(s.conn)
raddr := raddr(s.conn)

if len(b) <= len(s.payload) {
log.D("split-desync: write: no desync %s => %s; len(b) <= len(s.payload): %d <= %d", laddr, raddr, len(b), len(s.payload))
return conn.Write(b)
short := len(b) < len(s.payload)
swapped := false
used := s.used.Load()
if used {
// after the first write, there is no special write behavior.
// used may also be set to true to avoid desync.
n, err = conn.Write(b)
} else if swapped = s.used.CompareAndSwap(false, true); !swapped {
// set `used` to ensure this code only runs once per conn;
// if !swapped, some other goroutine has already swapped it.
n, err = conn.Write(b)
} else if short {
n, err = conn.Write(b)
}
if used || short || !swapped {
logeif(err)("split-desync: write: %s => %s; desync done %d; (used? %t, short? %t, race? %t); err? %v",
laddr, raddr, n, used, short, !swapped, err)
return n, err
}

rawConn, err := conn.SyscallConn()
Expand Down

1 comment on commit 9a33c57

@ignoramous
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#81

Please sign in to comment.