Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write buffer pooling #192

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ type Dialer struct {
// If Jar is nil, cookies are not sent in requests and ignored
// in responses.
Jar http.CookieJar

// WriteBufferPool specifies a pool of buffers to use for write methods. A
// nil value will cause a buffer to be allocated per connection. It is
// recommended to use a buffer pool for applications that have a large number
// of connections and a modest volume of writes. The provided buffer pool
// must not implement a new value instatiator (e.g. Do not implement
// sync.Pool.New()), and must not be shared across connections that have
// different values of WriteBufferSize.
WriteBufferPool BufferPool
}

var errMalformedURL = errors.New("malformed ws or wss URL")
Expand Down Expand Up @@ -339,6 +348,7 @@ func (d *Dialer) Dial(urlStr string, requestHeader http.Header) (*Conn, *http.Re
}

conn := newConn(netConn, false, d.ReadBufferSize, d.WriteBufferSize)
conn.writePool = d.WriteBufferPool

if err := req.Write(netConn); err != nil {
return nil, nil, err
Expand Down
53 changes: 47 additions & 6 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ type CloseError struct {
Text string
}

// BufferPool represents a pool of buffers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that sync.Pool implements the interface. Also state that a shared pool must support concurrent Get() and Put().

type BufferPool interface {
Get() interface{}
Put(interface{})
}

func (e *CloseError) Error() string {
s := []byte("websocket: close ")
s = strconv.AppendInt(s, int64(e.Code), 10)
Expand Down Expand Up @@ -225,11 +231,13 @@ type Conn struct {
subprotocol string

// Write fields
mu chan bool // used as mutex to protect write to conn
writeBuf []byte // frame is constructed in this buffer.
writeDeadline time.Time
writer io.WriteCloser // the current writer returned to the application
isWriting bool // for best-effort concurrent write detection
mu chan bool // used as mutex to protect write to conn
writeBuf []byte // frame is constructed in this buffer.
writeBufferSize int
writePool BufferPool
writeDeadline time.Time
writer io.WriteCloser // the current writer returned to the application
isWriting bool // for best-effort concurrent write detection

writeErrMu sync.Mutex
writeErr error
Expand Down Expand Up @@ -276,7 +284,7 @@ func newConn(conn net.Conn, isServer bool, readBufferSize, writeBufferSize int)
conn: conn,
mu: mu,
readFinal: true,
writeBuf: make([]byte, writeBufferSize+maxFrameHeaderSize),
writeBufferSize: writeBufferSize,
enableWriteCompression: true,
}
c.SetCloseHandler(nil)
Expand Down Expand Up @@ -441,6 +449,9 @@ func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
frameType: messageType,
pos: maxFrameHeaderSize,
}
if err := c.acquireWriteBuf(); err != nil {
return nil, err
}
c.writer = mw
if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
w, err := c.newCompressionWriter(c.writer)
Expand Down Expand Up @@ -644,6 +655,7 @@ func (w *messageWriter) ReadFrom(r io.Reader) (nn int64, err error) {
}

func (w *messageWriter) Close() error {
defer w.c.releaseWriteBuf()
if w.err != nil {
return w.err
}
Expand All @@ -666,6 +678,9 @@ func (c *Conn) WriteMessage(messageType int, data []byte) error {
return err
}
mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
if err := c.acquireWriteBuf(); err != nil {
return err
}
n := copy(c.writeBuf[mw.pos:], data)
mw.pos += n
data = data[n:]
Expand Down Expand Up @@ -1041,3 +1056,29 @@ func FormatCloseMessage(closeCode int, text string) []byte {
copy(buf[2:], text)
return buf
}

func (c *Conn) acquireWriteBuf() error {
if c.writeBuf != nil {
return nil
}
n := c.writeBufferSize + maxFrameHeaderSize
if c.writePool != nil {
if i := c.writePool.Get(); i != nil {
p, ok := i.([]byte)
if !ok || len(p) != n {
return errors.New("bad value from write buffer pool")
}
c.writeBuf = p
return nil
}
}
c.writeBuf = make([]byte, n)
return nil
}

func (c *Conn) releaseWriteBuf() {
if c.writePool != nil && c.writeBuf != nil {
c.writePool.Put(c.writeBuf)
c.writeBuf = nil
}
}
15 changes: 15 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ type Upgrader struct {
// guarantee that compression will be supported. Currently only "no context
// takeover" modes are supported.
EnableCompression bool

// WriteBufferPool specifies a pool of buffers to use for write methods. A
// nil value will cause a buffer to be allocated per connection. It is
// recommended to use a buffer pool for applications that have a large number
// of connections and a modest volume of writes. The provided buffer pool
// must not implement a new value instatiator (e.g. Do not implement
// sync.Pool.New()), and must not be shared across connections that have
// different values of WriteBufferSize.
WriteBufferPool BufferPool
}

func (u *Upgrader) returnError(w http.ResponseWriter, r *http.Request, status int, reason string) (*Conn, error) {
Expand Down Expand Up @@ -173,13 +182,19 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
}

c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize)
c.writePool = u.WriteBufferPool
c.subprotocol = subprotocol

if compress {
c.newCompressionWriter = compressNoContextTakeover
c.newDecompressionReader = decompressNoContextTakeover
}

if err = c.acquireWriteBuf(); err != nil {
netConn.Close()
return nil, err
}
defer c.releaseWriteBuf()
p := c.writeBuf[:0]
p = append(p, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
p = append(p, computeAcceptKey(challengeKey)...)
Expand Down