From 711db78346a7cbee693dd11f7751d64c9ab49600 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 25 Feb 2023 17:26:12 +0900 Subject: [PATCH] rpc_util: Introduce user created shared recv buffer pool option --- dialoptions.go | 11 +++--- rpc_util.go | 103 +++++-------------------------------------------- server.go | 14 +++---- stream.go | 4 +- 4 files changed, 24 insertions(+), 108 deletions(-) diff --git a/dialoptions.go b/dialoptions.go index 69f2f52d114b..fe1172172edc 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -76,7 +76,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder - useSharedRecvBuffers bool + sharedRecvBufferPool sharedRecvBufferPool } // DialOption configures how we set up the connection. @@ -637,16 +637,15 @@ func WithResolvers(rs ...resolver.Builder) DialOption { }) } -// WithSharedRecvBuffers returns a DialOption that specifies whether to use -// a bytes pool for parsing. Setting this to true will reduce the memory allocation -// in the parser. +// WithSharedRecvBufferPool returns a DialOption that specifies shared buffer pool +// for parsing. Setting this will reduce the memory allocation in the parser. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func WithSharedRecvBuffers(ok bool) DialOption { +func WithSharedRecvBufferPool(bufferPool sharedRecvBufferPool) DialOption { return newFuncDialOption(func(o *dialOptions) { - o.useSharedRecvBuffers = ok + o.sharedRecvBufferPool = bufferPool }) } diff --git a/rpc_util.go b/rpc_util.go index 007c62a2d10f..f0336b95936c 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -542,8 +542,8 @@ type parser struct { // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte - // useSharedRecvBuffers indicates whether to use shared receive buffers. - useSharedRecvBuffers bool + // sharedRecvBufferPool is the pool of shared receive buffers. + sharedRecvBufferPool sharedRecvBufferPool } // recvMsg reads a complete gRPC message from the stream. @@ -577,8 +577,8 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - if p.useSharedRecvBuffers { - msg = pool.Get(int(length)) + if p.sharedRecvBufferPool != nil { + msg = p.sharedRecvBufferPool.Get(int(length)) } else { msg = make([]byte, int(length)) } @@ -768,7 +768,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err) } if payInfo != nil { - if p.useSharedRecvBuffers { + if p.sharedRecvBufferPool != nil { if len(buf) != 0 { payInfo.uncompressedBytes = make([]byte, len(buf)) copy(payInfo.uncompressedBytes, buf) @@ -777,8 +777,8 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf payInfo.uncompressedBytes = buf } } - if p.useSharedRecvBuffers { - pool.Put(&buf) + if p.sharedRecvBufferPool != nil { + p.sharedRecvBufferPool.Put(&buf) } return nil } @@ -930,90 +930,7 @@ const ( const grpcUA = "grpc-go/" + Version -const ( - level0PoolMaxSize = 16 - level1PoolMaxSize = level0PoolMaxSize * 16 - level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB - level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB - level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB -) - -type bytesPools struct { - pool0 bytesPool - pool1 bytesPool - pool2 bytesPool - pool3 bytesPool - pool4 bytesPool - poolMax bytesPool -} - -func (p *bytesPools) Get(size int) []byte { - switch { - case size <= level0PoolMaxSize: - return p.pool0.Get(size) - case size <= level1PoolMaxSize: - return p.pool1.Get(size) - case size <= level2PoolMaxSize: - return p.pool2.Get(size) - case size <= level3PoolMaxSize: - return p.pool3.Get(size) - case size <= level4PoolMaxSize: - return p.pool4.Get(size) - default: - return p.poolMax.Get(size) - } -} - -func (p *bytesPools) Put(bs *[]byte) { - switch size := cap(*bs); { - case size <= level0PoolMaxSize: - p.pool0.Put(bs) - case size <= level1PoolMaxSize: - p.pool1.Put(bs) - case size <= level2PoolMaxSize: - p.pool2.Put(bs) - case size <= level3PoolMaxSize: - p.pool3.Put(bs) - case size <= level4PoolMaxSize: - p.pool4.Put(bs) - default: - p.poolMax.Put(bs) - } -} - -type bytesPool struct { - sync.Pool -} - -func makeBytesPool() bytesPool { - return bytesPool{ - sync.Pool{ - New: func() interface{} { - return new([]byte) - }, - }, - } -} - -func (p *bytesPool) Get(size int) []byte { - bs := p.Pool.Get().(*[]byte) - if cap(*bs) < size { - *bs = make([]byte, size) - return *bs - } - - return (*bs)[:size] -} - -func (p *bytesPool) Put(bs *[]byte) { - p.Pool.Put(bs) -} - -var pool = bytesPools{ - pool0: makeBytesPool(), - pool1: makeBytesPool(), - pool2: makeBytesPool(), - pool3: makeBytesPool(), - pool4: makeBytesPool(), - poolMax: makeBytesPool(), +type sharedRecvBufferPool interface { + Get(length int) []byte + Put(*[]byte) } diff --git a/server.go b/server.go index 0f320070c4dc..c723dcf3cbdf 100644 --- a/server.go +++ b/server.go @@ -174,7 +174,7 @@ type serverOptions struct { maxHeaderListSize *uint32 headerTableSize *uint32 numServerWorkers uint32 - useSharedRecvBuffers bool + sharedRecvBufferPool sharedRecvBufferPool } var defaultServerOptions = serverOptions{ @@ -553,16 +553,16 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { }) } -// SharedRecvBuffers returns a ServerOption that sets whether to use a bytes pool -// for the parser. Setting this to true will reduce the memory allocation in the parser. +// SharedRecvBufferPool returns a DialOption that specifies shared buffer pool +// for parsing. Setting this will reduce the memory allocation in the parser. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func SharedRecvBuffers(ok bool) ServerOption { +func SharedRecvBufferPool(bufferPool sharedRecvBufferPool) ServerOption { return newFuncServerOption(func(o *serverOptions) { - o.useSharedRecvBuffers = ok + o.sharedRecvBufferPool = bufferPool }) } @@ -1310,7 +1310,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if len(shs) != 0 || len(binlogs) != 0 { payInfo = &payloadInfo{} } - d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) + d, err := recvAndDecompress(&parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) if err != nil { if e := t.WriteStatus(stream, status.Convert(err)); e != nil { channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) @@ -1515,7 +1515,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ctx: ctx, t: t, s: stream, - p: &parser{r: stream}, + p: &parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool}, codec: s.getCodec(stream.ContentSubtype()), maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize, diff --git a/stream.go b/stream.go index b46e48f2c8c2..2c87e28d4927 100644 --- a/stream.go +++ b/stream.go @@ -490,7 +490,7 @@ func (a *csAttempt) newStream() error { return toRPCErr(nse.Err) } a.s = s - a.p = &parser{r: s, useSharedRecvBuffers: a.cs.cc.dopts.useSharedRecvBuffers} + a.p = &parser{r: s, sharedRecvBufferPool: a.cs.cc.dopts.sharedRecvBufferPool} return nil } @@ -1249,7 +1249,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin return nil, err } as.s = s - as.p = &parser{r: s, useSharedRecvBuffers: ac.dopts.useSharedRecvBuffers} + as.p = &parser{r: s, sharedRecvBufferPool: ac.dopts.sharedRecvBufferPool} ac.incrCallsStarted() if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the