Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically change a udp multicast peer's read buffer #99

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions multicast/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,15 @@ func (p *UDPPeer) unblockIPv4(multicastIP, sourceIP netip.Addr) (err error) {
func (p *UDPPeer) unblockIPv6(multicastIP, sourceIP netip.Addr) (err error) {
panic("IPv6 multicast peer not yet supported")
}

func (p *UDPPeer) Read(b []byte) (int, netip.AddrPort, error) {
return p.socket.RecvFrom(b, 0)
}

func (p *UDPPeer) SetAsyncReadBuffer(to []byte) {
p.read.b = to
}

func (p *UDPPeer) AsyncRead(b []byte, fn func(error, int, netip.AddrPort)) {
p.read.b = b
p.read.fn = fn
Expand Down
122 changes: 122 additions & 0 deletions multicast/peer_ipv4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net"
"net/netip"
"sort"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1974,3 +1975,124 @@ func TestUDPPeerIPv4_ReaderWriter(t *testing.T) {
}
}
}

func TestUDPPeerIPv4_MultipleReadersSameBuffer(t *testing.T) {
ioc := sonic.MustIO()
defer ioc.Close()

var (
ips = []string{"224.0.0.19", "224.0.0.20"}
ports = []int{1234, 4321}
addrs []netip.AddrPort
)
for i := 0; i < 2; i++ {
addr, err := netip.ParseAddrPort(
fmt.Sprintf("%s:%d", ips[i], ports[i]))
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, addr)
}

var (
chunk1, chunk2 [4]byte
b []byte
parity = 0
readers []*UDPPeer

read []int
)

for i := 0; i < 2; i++ {
r, err := NewUDPPeer(ioc, "udp", addrs[i].String())
if err != nil {
t.Fatal(err)
}
defer r.Close()

if err := r.Join(IP(ips[i])); err != nil {
t.Fatalf("reader could not join %s", ips[i])
} else {
log.Printf("reader joined group %s", ips[i])
}

id := i

var fn func(error, int, netip.AddrPort)
fn = func(err error, _ int, _ netip.AddrPort) {
if err != nil {
t.Fatal(err)
}

v := binary.BigEndian.Uint32(b)
log.Printf(
"reader %d read %d from %p",
id,
v,
b,
)
read = append(read, int(v))

parity++
if parity%2 == 0 {
b = chunk1[:]
} else {
b = chunk2[:]
}

for _, reader := range readers {
reader.SetAsyncReadBuffer(b)
}
Comment on lines +2043 to +2045
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this indeed needs to be on every AsyncRead when having multiple readers, then might be a good idea to introduce a peers manager that provides you with an AsyncRead that already takes care of this

Copy link
Collaborator Author

@sergiu128 sergiu128 Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed! I think this functionality should be in ava:gateway/lib though such that multiple exchanges can use it without worrying about this detail. I'll make it part of this PR

r.AsyncRead(b, fn)
}
b = chunk1[:]
r.AsyncRead(b, fn)

readers = append(readers, r)
}

var writers []*UDPPeer
for i := 0; i < 2; i++ {
w, err := NewUDPPeer(ioc, "udp", "")
if err != nil {
t.Fatal(err)
}
defer w.Close()

writers = append(writers, w)
}

var wb [4]byte
const Nops = 32

for i := 0; i < Nops; i++ {
time.Sleep(time.Millisecond)

ix := i % 2
binary.BigEndian.PutUint32(wb[:], uint32(i))

_, err := writers[ix].Write(wb[:], addrs[ix])
if err != nil && err != sonicerrors.ErrWouldBlock {
t.Fatalf("on the %d write err=%v", i, err)
}

for j := 0; j < Nops; j++ {
ioc.PollOne()
}
}
for j := 0; j < Nops; j++ {
ioc.PollOne()
}

// assert
sort.Ints(read)

if len(read) != Nops {
t.Fatal("did not read correctly")
}
for i := 0; i < Nops; i++ {
if read[i] != i {
t.Fatal("did not read correctly")
}
}
}