Skip to content

Commit

Permalink
Merge pull request #3142 from lucas-clemente/batched-read
Browse files Browse the repository at this point in the history
use batched reads
  • Loading branch information
marten-seemann authored Jul 6, 2021
2 parents fa2e797 + 870c759 commit 2054732
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 21 deletions.
4 changes: 4 additions & 0 deletions conn_helper_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ const (
msgTypeIPv4PKTINFO = unix.IP_PKTINFO
msgTypeIPv6PKTINFO = 0x2e
)

// ReadBatch only returns a single packet on OSX,
// see https://godoc.org/golang.org/x/net/ipv4#PacketConn.ReadBatch.
const batchSize = 1
2 changes: 2 additions & 0 deletions conn_helper_freebsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ const (
msgTypeIPv4PKTINFO = 0x7
msgTypeIPv6PKTINFO = 0x2e
)

const batchSize = 8
2 changes: 2 additions & 0 deletions conn_helper_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ const (
msgTypeIPv4PKTINFO = unix.IP_PKTINFO
msgTypeIPv6PKTINFO = unix.IPV6_PKTINFO
)

const batchSize = 8 // needs to smaller than MaxUint8 (otherwise the type of oobConn.readPos has to be changed)
79 changes: 60 additions & 19 deletions conn_oob.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,27 @@ import (
"time"
"unsafe"

"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"golang.org/x/sys/unix"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
)

const ecnMask uint8 = 0x3
const (
ecnMask = 0x3
oobBufferSize = 128
)

// Contrary to what the naming suggests, the ipv{4,6}.Message is not dependent on the IP version.
// They're both just aliases for x/net/internal/socket.Message.
// This means we can use this struct to read from a socket that receives both IPv4 and IPv6 messages.
var _ ipv4.Message = ipv6.Message{}

type batchConn interface {
ReadBatch(ms []ipv4.Message, flags int) (int, error)
}

func inspectReadBuffer(c interface{}) (int, error) {
conn, ok := c.(interface {
Expand All @@ -43,7 +57,12 @@ func inspectReadBuffer(c interface{}) (int, error) {

type oobConn struct {
OOBCapablePacketConn
oobBuffer []byte
batchConn batchConn

readPos uint8
// Packets received from the kernel, but not yet returned by ReadPacket().
messages []ipv4.Message
buffers [batchSize]*packetBuffer
}

var _ connection = &oobConn{}
Expand Down Expand Up @@ -94,23 +113,41 @@ func newConn(c OOBCapablePacketConn) (*oobConn, error) {
return nil, errors.New("activating packet info failed for both IPv4 and IPv6")
}
}
return &oobConn{
oobConn := &oobConn{
OOBCapablePacketConn: c,
oobBuffer: make([]byte, 128),
}, nil
batchConn: ipv4.NewPacketConn(c),
messages: make([]ipv4.Message, batchSize),
readPos: batchSize,
}
for i := 0; i < batchSize; i++ {
oobConn.messages[i].OOB = make([]byte, oobBufferSize)
}
return oobConn, nil
}

func (c *oobConn) ReadPacket() (*receivedPacket, error) {
buffer := getPacketBuffer()
// The packet size should not exceed protocol.MaxPacketBufferSize bytes
// If it does, we only read a truncated packet, which will then end up undecryptable
buffer.Data = buffer.Data[:protocol.MaxPacketBufferSize]
c.oobBuffer = c.oobBuffer[:cap(c.oobBuffer)]
n, oobn, _, addr, err := c.OOBCapablePacketConn.ReadMsgUDP(buffer.Data, c.oobBuffer)
if err != nil {
return nil, err
if len(c.messages) == int(c.readPos) { // all messages read. Read the next batch of messages.
c.messages = c.messages[:batchSize]
// replace buffers data buffers up to the packet that has been consumed during the last ReadBatch call
for i := uint8(0); i < c.readPos; i++ {
buffer := getPacketBuffer()
buffer.Data = buffer.Data[:protocol.MaxPacketBufferSize]
c.buffers[i] = buffer
c.messages[i].Buffers = [][]byte{c.buffers[i].Data}
}
c.readPos = 0

n, err := c.batchConn.ReadBatch(c.messages, 0)
if n == 0 || err != nil {
return nil, err
}
c.messages = c.messages[:n]
}
ctrlMsgs, err := unix.ParseSocketControlMessage(c.oobBuffer[:oobn])

msg := c.messages[c.readPos]
buffer := c.buffers[c.readPos]
c.readPos++
ctrlMsgs, err := unix.ParseSocketControlMessage(msg.OOB[:msg.NN])
if err != nil {
return nil, err
}
Expand All @@ -129,13 +166,15 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
// struct in_addr ipi_addr; /* Header Destination
// address */
// };
ip := make([]byte, 4)
if len(ctrlMsg.Data) == 12 {
ifIndex = binary.LittleEndian.Uint32(ctrlMsg.Data)
destIP = net.IP(ctrlMsg.Data[8:12])
copy(ip, ctrlMsg.Data[8:12])
} else if len(ctrlMsg.Data) == 4 {
// FreeBSD
destIP = net.IP(ctrlMsg.Data)
copy(ip, ctrlMsg.Data)
}
destIP = net.IP(ip)
}
}
if ctrlMsg.Header.Level == unix.IPPROTO_IPV6 {
Expand All @@ -148,7 +187,9 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
// unsigned int ipi6_ifindex; /* send/recv interface index */
// };
if len(ctrlMsg.Data) == 20 {
destIP = net.IP(ctrlMsg.Data[:16])
ip := make([]byte, 16)
copy(ip, ctrlMsg.Data[:16])
destIP = net.IP(ip)
ifIndex = binary.LittleEndian.Uint32(ctrlMsg.Data[16:])
}
}
Expand All @@ -162,9 +203,9 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
}
}
return &receivedPacket{
remoteAddr: addr,
remoteAddr: msg.Addr,
rcvTime: time.Now(),
data: buffer.Data[:n],
data: msg.Buffers[0][:msg.N],
ecn: ecn,
info: info,
buffer: buffer,
Expand Down
46 changes: 44 additions & 2 deletions conn_oob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
package quic

import (
"fmt"
"net"
"time"

"golang.org/x/net/ipv4"
"golang.org/x/sys/unix"

"github.com/golang/mock/gomock"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"

Expand All @@ -21,14 +24,14 @@ var _ = Describe("OOB Conn Test", func() {
Expect(err).ToNot(HaveOccurred())
udpConn, err := net.ListenUDP(network, addr)
Expect(err).ToNot(HaveOccurred())
ecnConn, err := newConn(udpConn)
oobConn, err := newConn(udpConn)
Expect(err).ToNot(HaveOccurred())

packetChan := make(chan *receivedPacket)
go func() {
defer GinkgoRecover()
for {
p, err := ecnConn.ReadPacket()
p, err := oobConn.ReadPacket()
if err != nil {
return
}
Expand Down Expand Up @@ -197,4 +200,43 @@ var _ = Describe("OOB Conn Test", func() {
Expect(p.info.addr).To(Equal(ip6))
})
})

Context("Batch Reading", func() {
var batchConn *MockBatchConn

BeforeEach(func() {
batchConn = NewMockBatchConn(mockCtrl)
})

It("reads multiple messages in one batch", func() {
const numMsgRead = batchSize/2 + 1
var counter int
batchConn.EXPECT().ReadBatch(gomock.Any(), gomock.Any()).DoAndReturn(func(ms []ipv4.Message, flags int) (int, error) {
Expect(ms).To(HaveLen(batchSize))
for i := 0; i < numMsgRead; i++ {
Expect(ms[i].Buffers).To(HaveLen(1))
Expect(ms[i].Buffers[0]).To(HaveLen(int(protocol.MaxPacketBufferSize)))
data := []byte(fmt.Sprintf("message %d", counter))
counter++
ms[i].Buffers[0] = data
ms[i].N = len(data)
}
return numMsgRead, nil
}).Times(2)

addr, err := net.ResolveUDPAddr("udp", "localhost:0")
Expect(err).ToNot(HaveOccurred())
udpConn, err := net.ListenUDP("udp", addr)
Expect(err).ToNot(HaveOccurred())
oobConn, err := newConn(udpConn)
Expect(err).ToNot(HaveOccurred())
oobConn.batchConn = batchConn

for i := 0; i < batchSize+1; i++ {
p, err := oobConn.ReadPacket()
Expect(err).ToNot(HaveOccurred())
Expect(string(p.data)).To(Equal(fmt.Sprintf("message %d", i)))
}
})
})
})
50 changes: 50 additions & 0 deletions mock_batch_conn_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mockgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ package quic
//go:generate sh -c "./mockgen_private.sh quic mock_unknown_packet_handler_test.go github.com/lucas-clemente/quic-go unknownPacketHandler"
//go:generate sh -c "./mockgen_private.sh quic mock_packet_handler_manager_test.go github.com/lucas-clemente/quic-go packetHandlerManager"
//go:generate sh -c "./mockgen_private.sh quic mock_multiplexer_test.go github.com/lucas-clemente/quic-go multiplexer"
//go:generate sh -c "./mockgen_private.sh quic mock_batch_conn_test.go github.com/lucas-clemente/quic-go batchConn"
//go:generate sh -c "mockgen -package quic -self_package github.com/lucas-clemente/quic-go -destination mock_token_store_test.go github.com/lucas-clemente/quic-go TokenStore && goimports -w mock_token_store_test.go"
//go:generate sh -c "mockgen -package quic -self_package github.com/lucas-clemente/quic-go -destination mock_packetconn_test.go net PacketConn && goimports -w mock_packetconn_test.go"

0 comments on commit 2054732

Please sign in to comment.