Skip to content

Commit

Permalink
rpc_util: Introduce user created shared recv buffer pool option
Browse files Browse the repository at this point in the history
  • Loading branch information
hueypark committed Feb 25, 2023
1 parent e746250 commit 711db78
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 108 deletions.
11 changes: 5 additions & 6 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type dialOptions struct {
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
resolvers []resolver.Builder
useSharedRecvBuffers bool
sharedRecvBufferPool sharedRecvBufferPool
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -637,16 +637,15 @@ func WithResolvers(rs ...resolver.Builder) DialOption {
})
}

// WithSharedRecvBuffers returns a DialOption that specifies whether to use
// a bytes pool for parsing. Setting this to true will reduce the memory allocation
// in the parser.
// WithSharedRecvBufferPool returns a DialOption that specifies shared buffer pool
// for parsing. Setting this will reduce the memory allocation in the parser.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithSharedRecvBuffers(ok bool) DialOption {
func WithSharedRecvBufferPool(bufferPool sharedRecvBufferPool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.useSharedRecvBuffers = ok
o.sharedRecvBufferPool = bufferPool
})
}
103 changes: 10 additions & 93 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ type parser struct {
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte

// useSharedRecvBuffers indicates whether to use shared receive buffers.
useSharedRecvBuffers bool
// sharedRecvBufferPool is the pool of shared receive buffers.
sharedRecvBufferPool sharedRecvBufferPool
}

// recvMsg reads a complete gRPC message from the stream.
Expand Down Expand Up @@ -577,8 +577,8 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
if p.useSharedRecvBuffers {
msg = pool.Get(int(length))
if p.sharedRecvBufferPool != nil {
msg = p.sharedRecvBufferPool.Get(int(length))
} else {
msg = make([]byte, int(length))
}
Expand Down Expand Up @@ -768,7 +768,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
}
if payInfo != nil {
if p.useSharedRecvBuffers {
if p.sharedRecvBufferPool != nil {
if len(buf) != 0 {
payInfo.uncompressedBytes = make([]byte, len(buf))
copy(payInfo.uncompressedBytes, buf)
Expand All @@ -777,8 +777,8 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
payInfo.uncompressedBytes = buf
}
}
if p.useSharedRecvBuffers {
pool.Put(&buf)
if p.sharedRecvBufferPool != nil {
p.sharedRecvBufferPool.Put(&buf)
}
return nil
}
Expand Down Expand Up @@ -930,90 +930,7 @@ const (

const grpcUA = "grpc-go/" + Version

const (
level0PoolMaxSize = 16
level1PoolMaxSize = level0PoolMaxSize * 16
level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB
level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB
level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB
)

type bytesPools struct {
pool0 bytesPool
pool1 bytesPool
pool2 bytesPool
pool3 bytesPool
pool4 bytesPool
poolMax bytesPool
}

func (p *bytesPools) Get(size int) []byte {
switch {
case size <= level0PoolMaxSize:
return p.pool0.Get(size)
case size <= level1PoolMaxSize:
return p.pool1.Get(size)
case size <= level2PoolMaxSize:
return p.pool2.Get(size)
case size <= level3PoolMaxSize:
return p.pool3.Get(size)
case size <= level4PoolMaxSize:
return p.pool4.Get(size)
default:
return p.poolMax.Get(size)
}
}

func (p *bytesPools) Put(bs *[]byte) {
switch size := cap(*bs); {
case size <= level0PoolMaxSize:
p.pool0.Put(bs)
case size <= level1PoolMaxSize:
p.pool1.Put(bs)
case size <= level2PoolMaxSize:
p.pool2.Put(bs)
case size <= level3PoolMaxSize:
p.pool3.Put(bs)
case size <= level4PoolMaxSize:
p.pool4.Put(bs)
default:
p.poolMax.Put(bs)
}
}

type bytesPool struct {
sync.Pool
}

func makeBytesPool() bytesPool {
return bytesPool{
sync.Pool{
New: func() interface{} {
return new([]byte)
},
},
}
}

func (p *bytesPool) Get(size int) []byte {
bs := p.Pool.Get().(*[]byte)
if cap(*bs) < size {
*bs = make([]byte, size)
return *bs
}

return (*bs)[:size]
}

func (p *bytesPool) Put(bs *[]byte) {
p.Pool.Put(bs)
}

var pool = bytesPools{
pool0: makeBytesPool(),
pool1: makeBytesPool(),
pool2: makeBytesPool(),
pool3: makeBytesPool(),
pool4: makeBytesPool(),
poolMax: makeBytesPool(),
type sharedRecvBufferPool interface {
Get(length int) []byte
Put(*[]byte)
}
14 changes: 7 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type serverOptions struct {
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
useSharedRecvBuffers bool
sharedRecvBufferPool sharedRecvBufferPool
}

var defaultServerOptions = serverOptions{
Expand Down Expand Up @@ -553,16 +553,16 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
})
}

// SharedRecvBuffers returns a ServerOption that sets whether to use a bytes pool
// for the parser. Setting this to true will reduce the memory allocation in the parser.
// SharedRecvBufferPool returns a DialOption that specifies shared buffer pool
// for parsing. Setting this will reduce the memory allocation in the parser.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SharedRecvBuffers(ok bool) ServerOption {
func SharedRecvBufferPool(bufferPool sharedRecvBufferPool) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.useSharedRecvBuffers = ok
o.sharedRecvBufferPool = bufferPool
})
}

Expand Down Expand Up @@ -1310,7 +1310,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{}
}
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
d, err := recvAndDecompress(&parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
Expand Down Expand Up @@ -1515,7 +1515,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ctx: ctx,
t: t,
s: stream,
p: &parser{r: stream},
p: &parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool},
codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
Expand Down
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (a *csAttempt) newStream() error {
return toRPCErr(nse.Err)
}
a.s = s
a.p = &parser{r: s, useSharedRecvBuffers: a.cs.cc.dopts.useSharedRecvBuffers}
a.p = &parser{r: s, sharedRecvBufferPool: a.cs.cc.dopts.sharedRecvBufferPool}
return nil
}

Expand Down Expand Up @@ -1249,7 +1249,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
return nil, err
}
as.s = s
as.p = &parser{r: s, useSharedRecvBuffers: ac.dopts.useSharedRecvBuffers}
as.p = &parser{r: s, sharedRecvBufferPool: ac.dopts.sharedRecvBufferPool}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
// Listen on cc and stream contexts to cleanup when the user closes the
Expand Down

0 comments on commit 711db78

Please sign in to comment.