From 493be5db050dbbc964b7ae90797057228ffe6daa Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 25 Jan 2023 20:43:52 +0900 Subject: [PATCH 01/36] rpc_util: add bytes pool for the parser --- rpc_util.go | 49 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/rpc_util.go b/rpc_util.go index cb7020ebecd7..fa79293e3b1e 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -574,9 +574,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead - // of making it for each message: - msg = make([]byte, int(length)) + msg = pool.Get(int(length)) if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF @@ -689,12 +687,12 @@ type payloadInfo struct { } func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) { - pf, d, err := p.recvMsg(maxReceiveMessageSize) + pf, buf, err := p.recvMsg(maxReceiveMessageSize) if err != nil { return nil, err } if payInfo != nil { - payInfo.wireLength = len(d) + payInfo.wireLength = len(buf) } if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil { @@ -706,10 +704,10 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor, // use this decompressor as the default. if dc != nil { - d, err = dc.Do(bytes.NewReader(d)) - size = len(d) + buf, err = dc.Do(bytes.NewReader(buf)) + size = len(buf) } else { - d, size, err = decompress(compressor, d, maxReceiveMessageSize) + buf, size, err = decompress(compressor, buf, maxReceiveMessageSize) } if err != nil { return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err) @@ -720,7 +718,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize) } } - return d, nil + return buf, nil } // Using compressor, decompress d, returning data and size. @@ -755,16 +753,18 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize // dc takes precedence over compressor. // TODO(dfawley): wrap the old compressor/decompressor using the new API? func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error { - d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) + buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) if err != nil { return err } - if err := c.Unmarshal(d, m); err != nil { + if err := c.Unmarshal(buf, m); err != nil { return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err) } if payInfo != nil { - payInfo.uncompressedBytes = d + payInfo.uncompressedBytes = make([]byte, len(buf)) + copy(payInfo.uncompressedBytes, buf) } + pool.Put(&buf) return nil } @@ -914,3 +914,28 @@ const ( ) const grpcUA = "grpc-go/" + Version + +type bytesPool struct { + sync.Pool +} + +func (p *bytesPool) Get(size int) []byte { + bs := p.Pool.Get().(*[]byte) + if cap(*bs) < size { + *bs = make([]byte, size) + } + + return (*bs)[:size] +} + +func (p *bytesPool) Put(bs *[]byte) { + p.Pool.Put(bs) +} + +var pool = bytesPool{ + sync.Pool{ + New: func() interface{} { + return new([]byte) + }, + }, +} From ecdb6d229520e6ed0b5830243f81ed98df2f250e Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 25 Jan 2023 21:53:58 +0900 Subject: [PATCH 02/36] rpc_util: handle empty buffer case for copy payInfo.uncompressedBytes --- rpc_util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc_util.go b/rpc_util.go index fa79293e3b1e..622b3403d9e2 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -760,7 +760,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf if err := c.Unmarshal(buf, m); err != nil { return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err) } - if payInfo != nil { + if payInfo != nil && len(buf) != 0 { payInfo.uncompressedBytes = make([]byte, len(buf)) copy(payInfo.uncompressedBytes, buf) } From cb8827d8caae66a209b3cbf6d4e430f5d69b0a5d Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 1 Feb 2023 01:13:11 +0900 Subject: [PATCH 03/36] rpc_util: Use a pool of bytes with varying sizes for the parser --- rpc_util.go | 79 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 8 deletions(-) diff --git a/rpc_util.go b/rpc_util.go index 622b3403d9e2..5d0a41136389 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -915,27 +915,90 @@ const ( const grpcUA = "grpc-go/" + Version +const ( + level0PoolMaxSize = 16 + level1PoolMaxSize = level0PoolMaxSize * 16 + level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB + level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB + level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB +) + +type bytesPools struct { + pool0 bytesPool + pool1 bytesPool + pool2 bytesPool + pool3 bytesPool + pool4 bytesPool + poolMax bytesPool +} + +func (p *bytesPools) Get(size int) []byte { + switch { + case size <= level0PoolMaxSize: + return p.pool0.Get(size) + case size <= level1PoolMaxSize: + return p.pool1.Get(size) + case size <= level2PoolMaxSize: + return p.pool2.Get(size) + case size <= level3PoolMaxSize: + return p.pool3.Get(size) + case size <= level4PoolMaxSize: + return p.pool4.Get(size) + default: + return p.poolMax.Get(size) + } +} + +func (p *bytesPools) Put(bs *[]byte) { + switch size := cap(*bs); { + case size <= level0PoolMaxSize: + p.pool0.Put(bs) + case size <= level1PoolMaxSize: + p.pool1.Put(bs) + case size <= level2PoolMaxSize: + p.pool2.Put(bs) + case size <= level3PoolMaxSize: + p.pool3.Put(bs) + case size <= level4PoolMaxSize: + p.pool4.Put(bs) + default: + p.poolMax.Put(bs) + } +} + type bytesPool struct { sync.Pool } +func makeBytesPool() bytesPool { + return bytesPool{ + sync.Pool{ + New: func() interface{} { + return new([]byte) + }, + }, + } +} + func (p *bytesPool) Get(size int) []byte { bs := p.Pool.Get().(*[]byte) if cap(*bs) < size { *bs = make([]byte, size) + return *bs + } else { + return (*bs)[:size] } - - return (*bs)[:size] } func (p *bytesPool) Put(bs *[]byte) { p.Pool.Put(bs) } -var pool = bytesPool{ - sync.Pool{ - New: func() interface{} { - return new([]byte) - }, - }, +var pool = bytesPools{ + pool0: makeBytesPool(), + pool1: makeBytesPool(), + pool2: makeBytesPool(), + pool3: makeBytesPool(), + pool4: makeBytesPool(), + poolMax: makeBytesPool(), } From bae37e3cae1d8eacd83e09c6077ee72fba78ec1c Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 1 Feb 2023 01:50:23 +0900 Subject: [PATCH 04/36] grpc: Add useBytesPoolForParser option --- dialoptions.go | 15 +++++++++++++++ rpc_util.go | 13 +++++++++++-- server.go | 14 ++++++++++++++ stream.go | 4 ++-- 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/dialoptions.go b/dialoptions.go index 4866da101c60..025cca665243 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -76,6 +76,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder + useBytesPoolForParser bool } // DialOption configures how we set up the connection. @@ -635,3 +636,17 @@ func WithResolvers(rs ...resolver.Builder) DialOption { o.resolvers = append(o.resolvers, rs...) }) } + +// WithUseBytesPoolForParser returns a DialOption that specifies whether to use +// a bytes pool for parsing. Setting this to true will reduce the memory allocation +// in the parser. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WithUseBytesPoolForParser(useBytesPoolForParser bool) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.useBytesPoolForParser = useBytesPoolForParser + }) +} diff --git a/rpc_util.go b/rpc_util.go index 5d0a41136389..2acd01d90e64 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -541,6 +541,9 @@ type parser struct { // The header of a gRPC message. Find more detail at // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte + + // useBytesPool indicates whether to use bytes pool to allocate + useBytesPool bool } // recvMsg reads a complete gRPC message from the stream. @@ -574,7 +577,11 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - msg = pool.Get(int(length)) + if p.useBytesPool { + msg = pool.Get(int(length)) + } else { + msg = make([]byte, int(length)) + } if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF @@ -764,7 +771,9 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf payInfo.uncompressedBytes = make([]byte, len(buf)) copy(payInfo.uncompressedBytes, buf) } - pool.Put(&buf) + if p.useBytesPool { + pool.Put(&buf) + } return nil } diff --git a/server.go b/server.go index d5a6e78be44d..a5bd2dbfac7a 100644 --- a/server.go +++ b/server.go @@ -174,6 +174,7 @@ type serverOptions struct { maxHeaderListSize *uint32 headerTableSize *uint32 numServerWorkers uint32 + useBytesPoolForParser bool } var defaultServerOptions = serverOptions{ @@ -552,6 +553,19 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { }) } +// UseBytesPoolForParser returns a ServerOption that sets whether to use a bytes pool +// for the parser. Setting this to true will reduce the memory allocation in the parser. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func UseBytesPoolForParser(useBytesPoolForParser bool) ServerOption { + return newFuncServerOption(func(o *serverOptions) { + o.useBytesPoolForParser = useBytesPoolForParser + }) +} + // serverWorkerResetThreshold defines how often the stack must be reset. Every // N requests, by spawning a new goroutine in its place, a worker can reset its // stack so that large stacks don't live in memory forever. 2^16 should allow diff --git a/stream.go b/stream.go index 93231af2ac56..57c709e89aec 100644 --- a/stream.go +++ b/stream.go @@ -490,7 +490,7 @@ func (a *csAttempt) newStream() error { return toRPCErr(nse.Err) } a.s = s - a.p = &parser{r: s} + a.p = &parser{r: s, useBytesPool: a.cs.cc.dopts.useBytesPoolForParser} return nil } @@ -1249,7 +1249,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin return nil, err } as.s = s - as.p = &parser{r: s} + as.p = &parser{r: s, useBytesPool: ac.dopts.useBytesPoolForParser} ac.incrCallsStarted() if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the From cbaef54d582528b628acc40d6ac89519279446fa Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 1 Feb 2023 02:01:03 +0900 Subject: [PATCH 05/36] rpc_util: Fix a. vet error --- rpc_util.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc_util.go b/rpc_util.go index 2acd01d90e64..4e91c9126963 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -994,9 +994,9 @@ func (p *bytesPool) Get(size int) []byte { if cap(*bs) < size { *bs = make([]byte, size) return *bs - } else { - return (*bs)[:size] } + + return (*bs)[:size] } func (p *bytesPool) Put(bs *[]byte) { From f9730dd2fd5793abc0d4ca70b9893b8823442889 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 25 Feb 2023 13:06:47 +0900 Subject: [PATCH 06/36] rpc_util: Copy the buffer only if the pool option is enabled --- rpc_util.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/rpc_util.go b/rpc_util.go index 4e91c9126963..821489ad1bb3 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -767,9 +767,15 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf if err := c.Unmarshal(buf, m); err != nil { return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err) } - if payInfo != nil && len(buf) != 0 { - payInfo.uncompressedBytes = make([]byte, len(buf)) - copy(payInfo.uncompressedBytes, buf) + if payInfo != nil { + if p.useBytesPool { + if len(buf) != 0 { + payInfo.uncompressedBytes = make([]byte, len(buf)) + copy(payInfo.uncompressedBytes, buf) + } + } else { + payInfo.uncompressedBytes = buf + } } if p.useBytesPool { pool.Put(&buf) From e7462501868ba7985b6e62593448e6ef5362be63 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 25 Feb 2023 13:47:36 +0900 Subject: [PATCH 07/36] rpc_util: Rename the option to `useSharedRecvBuffers` --- dialoptions.go | 8 ++++---- rpc_util.go | 10 +++++----- server.go | 8 ++++---- stream.go | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dialoptions.go b/dialoptions.go index 025cca665243..69f2f52d114b 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -76,7 +76,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder - useBytesPoolForParser bool + useSharedRecvBuffers bool } // DialOption configures how we set up the connection. @@ -637,7 +637,7 @@ func WithResolvers(rs ...resolver.Builder) DialOption { }) } -// WithUseBytesPoolForParser returns a DialOption that specifies whether to use +// WithSharedRecvBuffers returns a DialOption that specifies whether to use // a bytes pool for parsing. Setting this to true will reduce the memory allocation // in the parser. // @@ -645,8 +645,8 @@ func WithResolvers(rs ...resolver.Builder) DialOption { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func WithUseBytesPoolForParser(useBytesPoolForParser bool) DialOption { +func WithSharedRecvBuffers(ok bool) DialOption { return newFuncDialOption(func(o *dialOptions) { - o.useBytesPoolForParser = useBytesPoolForParser + o.useSharedRecvBuffers = ok }) } diff --git a/rpc_util.go b/rpc_util.go index 821489ad1bb3..007c62a2d10f 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -542,8 +542,8 @@ type parser struct { // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte - // useBytesPool indicates whether to use bytes pool to allocate - useBytesPool bool + // useSharedRecvBuffers indicates whether to use shared receive buffers. + useSharedRecvBuffers bool } // recvMsg reads a complete gRPC message from the stream. @@ -577,7 +577,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - if p.useBytesPool { + if p.useSharedRecvBuffers { msg = pool.Get(int(length)) } else { msg = make([]byte, int(length)) @@ -768,7 +768,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err) } if payInfo != nil { - if p.useBytesPool { + if p.useSharedRecvBuffers { if len(buf) != 0 { payInfo.uncompressedBytes = make([]byte, len(buf)) copy(payInfo.uncompressedBytes, buf) @@ -777,7 +777,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf payInfo.uncompressedBytes = buf } } - if p.useBytesPool { + if p.useSharedRecvBuffers { pool.Put(&buf) } return nil diff --git a/server.go b/server.go index a5bd2dbfac7a..0f320070c4dc 100644 --- a/server.go +++ b/server.go @@ -174,7 +174,7 @@ type serverOptions struct { maxHeaderListSize *uint32 headerTableSize *uint32 numServerWorkers uint32 - useBytesPoolForParser bool + useSharedRecvBuffers bool } var defaultServerOptions = serverOptions{ @@ -553,16 +553,16 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { }) } -// UseBytesPoolForParser returns a ServerOption that sets whether to use a bytes pool +// SharedRecvBuffers returns a ServerOption that sets whether to use a bytes pool // for the parser. Setting this to true will reduce the memory allocation in the parser. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func UseBytesPoolForParser(useBytesPoolForParser bool) ServerOption { +func SharedRecvBuffers(ok bool) ServerOption { return newFuncServerOption(func(o *serverOptions) { - o.useBytesPoolForParser = useBytesPoolForParser + o.useSharedRecvBuffers = ok }) } diff --git a/stream.go b/stream.go index 57c709e89aec..b46e48f2c8c2 100644 --- a/stream.go +++ b/stream.go @@ -490,7 +490,7 @@ func (a *csAttempt) newStream() error { return toRPCErr(nse.Err) } a.s = s - a.p = &parser{r: s, useBytesPool: a.cs.cc.dopts.useBytesPoolForParser} + a.p = &parser{r: s, useSharedRecvBuffers: a.cs.cc.dopts.useSharedRecvBuffers} return nil } @@ -1249,7 +1249,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin return nil, err } as.s = s - as.p = &parser{r: s, useBytesPool: ac.dopts.useBytesPoolForParser} + as.p = &parser{r: s, useSharedRecvBuffers: ac.dopts.useSharedRecvBuffers} ac.incrCallsStarted() if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the From 711db78346a7cbee693dd11f7751d64c9ab49600 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 25 Feb 2023 17:26:12 +0900 Subject: [PATCH 08/36] rpc_util: Introduce user created shared recv buffer pool option --- dialoptions.go | 11 +++--- rpc_util.go | 103 +++++-------------------------------------------- server.go | 14 +++---- stream.go | 4 +- 4 files changed, 24 insertions(+), 108 deletions(-) diff --git a/dialoptions.go b/dialoptions.go index 69f2f52d114b..fe1172172edc 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -76,7 +76,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder - useSharedRecvBuffers bool + sharedRecvBufferPool sharedRecvBufferPool } // DialOption configures how we set up the connection. @@ -637,16 +637,15 @@ func WithResolvers(rs ...resolver.Builder) DialOption { }) } -// WithSharedRecvBuffers returns a DialOption that specifies whether to use -// a bytes pool for parsing. Setting this to true will reduce the memory allocation -// in the parser. +// WithSharedRecvBufferPool returns a DialOption that specifies shared buffer pool +// for parsing. Setting this will reduce the memory allocation in the parser. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func WithSharedRecvBuffers(ok bool) DialOption { +func WithSharedRecvBufferPool(bufferPool sharedRecvBufferPool) DialOption { return newFuncDialOption(func(o *dialOptions) { - o.useSharedRecvBuffers = ok + o.sharedRecvBufferPool = bufferPool }) } diff --git a/rpc_util.go b/rpc_util.go index 007c62a2d10f..f0336b95936c 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -542,8 +542,8 @@ type parser struct { // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte - // useSharedRecvBuffers indicates whether to use shared receive buffers. - useSharedRecvBuffers bool + // sharedRecvBufferPool is the pool of shared receive buffers. + sharedRecvBufferPool sharedRecvBufferPool } // recvMsg reads a complete gRPC message from the stream. @@ -577,8 +577,8 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - if p.useSharedRecvBuffers { - msg = pool.Get(int(length)) + if p.sharedRecvBufferPool != nil { + msg = p.sharedRecvBufferPool.Get(int(length)) } else { msg = make([]byte, int(length)) } @@ -768,7 +768,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err) } if payInfo != nil { - if p.useSharedRecvBuffers { + if p.sharedRecvBufferPool != nil { if len(buf) != 0 { payInfo.uncompressedBytes = make([]byte, len(buf)) copy(payInfo.uncompressedBytes, buf) @@ -777,8 +777,8 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf payInfo.uncompressedBytes = buf } } - if p.useSharedRecvBuffers { - pool.Put(&buf) + if p.sharedRecvBufferPool != nil { + p.sharedRecvBufferPool.Put(&buf) } return nil } @@ -930,90 +930,7 @@ const ( const grpcUA = "grpc-go/" + Version -const ( - level0PoolMaxSize = 16 - level1PoolMaxSize = level0PoolMaxSize * 16 - level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB - level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB - level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB -) - -type bytesPools struct { - pool0 bytesPool - pool1 bytesPool - pool2 bytesPool - pool3 bytesPool - pool4 bytesPool - poolMax bytesPool -} - -func (p *bytesPools) Get(size int) []byte { - switch { - case size <= level0PoolMaxSize: - return p.pool0.Get(size) - case size <= level1PoolMaxSize: - return p.pool1.Get(size) - case size <= level2PoolMaxSize: - return p.pool2.Get(size) - case size <= level3PoolMaxSize: - return p.pool3.Get(size) - case size <= level4PoolMaxSize: - return p.pool4.Get(size) - default: - return p.poolMax.Get(size) - } -} - -func (p *bytesPools) Put(bs *[]byte) { - switch size := cap(*bs); { - case size <= level0PoolMaxSize: - p.pool0.Put(bs) - case size <= level1PoolMaxSize: - p.pool1.Put(bs) - case size <= level2PoolMaxSize: - p.pool2.Put(bs) - case size <= level3PoolMaxSize: - p.pool3.Put(bs) - case size <= level4PoolMaxSize: - p.pool4.Put(bs) - default: - p.poolMax.Put(bs) - } -} - -type bytesPool struct { - sync.Pool -} - -func makeBytesPool() bytesPool { - return bytesPool{ - sync.Pool{ - New: func() interface{} { - return new([]byte) - }, - }, - } -} - -func (p *bytesPool) Get(size int) []byte { - bs := p.Pool.Get().(*[]byte) - if cap(*bs) < size { - *bs = make([]byte, size) - return *bs - } - - return (*bs)[:size] -} - -func (p *bytesPool) Put(bs *[]byte) { - p.Pool.Put(bs) -} - -var pool = bytesPools{ - pool0: makeBytesPool(), - pool1: makeBytesPool(), - pool2: makeBytesPool(), - pool3: makeBytesPool(), - pool4: makeBytesPool(), - poolMax: makeBytesPool(), +type sharedRecvBufferPool interface { + Get(length int) []byte + Put(*[]byte) } diff --git a/server.go b/server.go index 0f320070c4dc..c723dcf3cbdf 100644 --- a/server.go +++ b/server.go @@ -174,7 +174,7 @@ type serverOptions struct { maxHeaderListSize *uint32 headerTableSize *uint32 numServerWorkers uint32 - useSharedRecvBuffers bool + sharedRecvBufferPool sharedRecvBufferPool } var defaultServerOptions = serverOptions{ @@ -553,16 +553,16 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { }) } -// SharedRecvBuffers returns a ServerOption that sets whether to use a bytes pool -// for the parser. Setting this to true will reduce the memory allocation in the parser. +// SharedRecvBufferPool returns a DialOption that specifies shared buffer pool +// for parsing. Setting this will reduce the memory allocation in the parser. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func SharedRecvBuffers(ok bool) ServerOption { +func SharedRecvBufferPool(bufferPool sharedRecvBufferPool) ServerOption { return newFuncServerOption(func(o *serverOptions) { - o.useSharedRecvBuffers = ok + o.sharedRecvBufferPool = bufferPool }) } @@ -1310,7 +1310,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if len(shs) != 0 || len(binlogs) != 0 { payInfo = &payloadInfo{} } - d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) + d, err := recvAndDecompress(&parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) if err != nil { if e := t.WriteStatus(stream, status.Convert(err)); e != nil { channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) @@ -1515,7 +1515,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ctx: ctx, t: t, s: stream, - p: &parser{r: stream}, + p: &parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool}, codec: s.getCodec(stream.ContentSubtype()), maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize, diff --git a/stream.go b/stream.go index b46e48f2c8c2..2c87e28d4927 100644 --- a/stream.go +++ b/stream.go @@ -490,7 +490,7 @@ func (a *csAttempt) newStream() error { return toRPCErr(nse.Err) } a.s = s - a.p = &parser{r: s, useSharedRecvBuffers: a.cs.cc.dopts.useSharedRecvBuffers} + a.p = &parser{r: s, sharedRecvBufferPool: a.cs.cc.dopts.sharedRecvBufferPool} return nil } @@ -1249,7 +1249,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin return nil, err } as.s = s - as.p = &parser{r: s, useSharedRecvBuffers: ac.dopts.useSharedRecvBuffers} + as.p = &parser{r: s, sharedRecvBufferPool: ac.dopts.sharedRecvBufferPool} ac.incrCallsStarted() if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the From c05feed5c94abfc3d94c8eac26af4f1be359ce03 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 25 Feb 2023 17:29:04 +0900 Subject: [PATCH 09/36] benchmark: Introduce shared receive buffers(nil and simple) feature --- benchmark/benchmain/main.go | 63 +++++++++++++++++++++++++++++++++++++ benchmark/stats/stats.go | 10 ++++-- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 3d054f358037..611a5a1a9a37 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -109,6 +109,7 @@ var ( clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list") serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list") serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list") + sharedRecvBufferPool = flags.StringWithAllowedValues("sharedRecvBufferPool", sharedRecvBufferPoolAll, "Configures the shared receive buffer pool. One of: nil, simple", allSharedRecvBufferPools) logger = grpclog.Component("benchmark") ) @@ -133,6 +134,10 @@ const ( networkModeLAN = "LAN" networkModeWAN = "WAN" networkLongHaul = "Longhaul" + // Shared recv buffer pool + sharedRecvBufferPoolNil = "nil" + sharedRecvBufferPoolSimple = "simple" + sharedRecvBufferPoolAll = "all" numStatsBuckets = 10 warmupCallCount = 10 @@ -144,6 +149,7 @@ var ( allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll} allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth} allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul} + allSharedRecvBufferPools = []string{sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple, sharedRecvBufferPoolAll} defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. defaultReadKbps = []int{0, 10240} // if non-positive, infinite defaultReadMTU = []int{0} // if non-positive, infinite @@ -321,6 +327,15 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) { if bf.ServerWriteBufferSize >= 0 { sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize)) } + switch bf.SharedRecvBufferPool { + case sharedRecvBufferPoolNil: + // Do nothing. + case sharedRecvBufferPoolSimple: + opts = append(opts, grpc.WithSharedRecvBufferPool(newSimpleSharedRecvBufferPool())) + sopts = append(sopts, grpc.SharedRecvBufferPool(newSimpleSharedRecvBufferPool())) + default: + logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.SharedRecvBufferPool) + } sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1))) opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -528,6 +543,7 @@ type featureOpts struct { clientWriteBufferSize []int serverReadBufferSize []int serverWriteBufferSize []int + sharedRecvBufferPools []string } // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each @@ -572,6 +588,8 @@ func makeFeaturesNum(b *benchOpts) []int { featuresNum[i] = len(b.features.serverReadBufferSize) case stats.ServerWriteBufferSize: featuresNum[i] = len(b.features.serverWriteBufferSize) + case stats.SharedRecvBufferPool: + featuresNum[i] = len(b.features.sharedRecvBufferPools) default: log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex) } @@ -638,6 +656,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]], ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]], ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]], + SharedRecvBufferPool: b.features.sharedRecvBufferPools[curPos[stats.SharedRecvBufferPool]], } if len(b.features.reqPayloadCurves) == 0 { f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]] @@ -708,6 +727,7 @@ func processFlags() *benchOpts { clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...), serverReadBufferSize: append([]int(nil), *serverReadBufferSize...), serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...), + sharedRecvBufferPools: setSharedRecvBufferPool(*sharedRecvBufferPool), }, } @@ -783,6 +803,19 @@ func setCompressorMode(val string) []string { } } +func setSharedRecvBufferPool(val string) []string { + switch val { + case sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple: + return []string{val} + case sharedRecvBufferPoolAll: + return []string{sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple} + default: + // This should never happen because a wrong value passed to this flag would + // be caught during flag.Parse(). + return []string{} + } +} + func main() { opts := processFlags() before(opts) @@ -882,3 +915,33 @@ type nopDecompressor struct{} func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return io.ReadAll(r) } func (nopDecompressor) Type() string { return compModeNop } + +// simpleSharedRecvBufferPool is a simple implementation of sharedRecvBufferPool. +type simpleSharedRecvBufferPool struct { + sync.Pool +} + +func newSimpleSharedRecvBufferPool() *simpleSharedRecvBufferPool { + return &simpleSharedRecvBufferPool{ + Pool: sync.Pool{ + New: func() interface{} { + bs := make([]byte, 0) + return &bs + }, + }, + } +} + +func (p *simpleSharedRecvBufferPool) Get(size int) []byte { + bs := p.Pool.Get().(*[]byte) + if cap(*bs) < size { + *bs = make([]byte, size) + return *bs + } + + return (*bs)[:size] +} + +func (p *simpleSharedRecvBufferPool) Put(bs *[]byte) { + p.Pool.Put(bs) +} diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index f5d8666648df..ec0cde056d44 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -56,6 +56,7 @@ const ( ClientWriteBufferSize ServerReadBufferSize ServerWriteBufferSize + SharedRecvBufferPool // MaxFeatureIndex is a place holder to indicate the total number of feature // indices we have. Any new feature indices should be added above this. @@ -121,6 +122,8 @@ type Features struct { ServerReadBufferSize int // ServerWriteBufferSize is the size of the server write buffer in bytes. If negative, use the default buffer size. ServerWriteBufferSize int + // SharedRecvBufferPool represents the shared recv buffer pool used. + SharedRecvBufferPool string } // String returns all the feature values as a string. @@ -139,12 +142,13 @@ func (f Features) String() string { return fmt.Sprintf("networkMode_%v-bufConn_%v-keepalive_%v-benchTime_%v-"+ "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+ "compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+ - "clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-", + "clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+ + "sharedRecvBufferPool_%v-", f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString, respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader, f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize, - f.ServerWriteBufferSize) + f.ServerWriteBufferSize, f.SharedRecvBufferPool) } // SharedFeatures returns the shared features as a pretty printable string. @@ -216,6 +220,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim b.WriteString(fmt.Sprintf("ServerReadBufferSize%v%v%v", sep, f.ServerReadBufferSize, delim)) case ServerWriteBufferSize: b.WriteString(fmt.Sprintf("ServerWriteBufferSize%v%v%v", sep, f.ServerWriteBufferSize, delim)) + case SharedRecvBufferPool: + b.WriteString(fmt.Sprintf("SharedRecvBufferPool%v%v%v", sep, f.SharedRecvBufferPool, delim)) default: log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex) } From b1c649610ee50fa5fe898e343e3fd40d6b728c88 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 15 Mar 2023 19:49:14 +0900 Subject: [PATCH 10/36] rpc_util: export a SharedBufferPool --- dialoptions.go | 4 ++-- rpc_util.go | 7 +------ server.go | 4 ++-- shared_buffer_pool.go | 7 +++++++ 4 files changed, 12 insertions(+), 10 deletions(-) create mode 100644 shared_buffer_pool.go diff --git a/dialoptions.go b/dialoptions.go index 7e6179430940..d97bdc92c68c 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -77,7 +77,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder - sharedRecvBufferPool sharedRecvBufferPool + sharedRecvBufferPool SharedBufferPool } // DialOption configures how we set up the connection. @@ -655,7 +655,7 @@ func WithResolvers(rs ...resolver.Builder) DialOption { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func WithSharedRecvBufferPool(bufferPool sharedRecvBufferPool) DialOption { +func WithSharedRecvBufferPool(bufferPool SharedBufferPool) DialOption { return newFuncDialOption(func(o *dialOptions) { o.sharedRecvBufferPool = bufferPool }) diff --git a/rpc_util.go b/rpc_util.go index 1e89f2329a97..756a4151ae1d 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -579,7 +579,7 @@ type parser struct { header [5]byte // sharedRecvBufferPool is the pool of shared receive buffers. - sharedRecvBufferPool sharedRecvBufferPool + sharedRecvBufferPool SharedBufferPool } // recvMsg reads a complete gRPC message from the stream. @@ -966,8 +966,3 @@ const ( ) const grpcUA = "grpc-go/" + Version - -type sharedRecvBufferPool interface { - Get(length int) []byte - Put(*[]byte) -} diff --git a/server.go b/server.go index 54a1215cda73..c8096dd3884c 100644 --- a/server.go +++ b/server.go @@ -175,7 +175,7 @@ type serverOptions struct { maxHeaderListSize *uint32 headerTableSize *uint32 numServerWorkers uint32 - sharedRecvBufferPool sharedRecvBufferPool + sharedRecvBufferPool SharedBufferPool } var defaultServerOptions = serverOptions{ @@ -561,7 +561,7 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func SharedRecvBufferPool(bufferPool sharedRecvBufferPool) ServerOption { +func SharedRecvBufferPool(bufferPool SharedBufferPool) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.sharedRecvBufferPool = bufferPool }) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go new file mode 100644 index 000000000000..af61e62f632d --- /dev/null +++ b/shared_buffer_pool.go @@ -0,0 +1,7 @@ +package grpc + +// SharedBufferPool is a pool of buffers that can be shared. +type SharedBufferPool interface { + Get(length int) []byte + Put(*[]byte) +} From 086dd289b748968ed5bdbc1fc90da97f6d7e539a Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 15 Mar 2023 19:51:29 +0900 Subject: [PATCH 11/36] rpc_util: improve comment for the option function --- dialoptions.go | 5 +++-- server.go | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dialoptions.go b/dialoptions.go index d97bdc92c68c..981cbfc6120c 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -648,8 +648,9 @@ func WithResolvers(rs ...resolver.Builder) DialOption { }) } -// WithSharedRecvBufferPool returns a DialOption that specifies shared buffer pool -// for parsing. Setting this will reduce the memory allocation in the parser. +// WithSharedRecvBufferPool returns a DialOption that configures the ClientConn +// to use the provided shared buffer pool for parsing incoming messages. Depending +// on the application's workload, this could result in reduced memory allocation. // // # Experimental // diff --git a/server.go b/server.go index c8096dd3884c..35ea570d93a0 100644 --- a/server.go +++ b/server.go @@ -554,8 +554,9 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { }) } -// SharedRecvBufferPool returns a DialOption that specifies shared buffer pool -// for parsing. Setting this will reduce the memory allocation in the parser. +// SharedRecvBufferPool returns a ServerOption that configures the server +// to use the provided shared buffer pool for parsing incoming messages. Depending +// on the application's workload, this could result in reduced memory allocation. // // # Experimental // From d6c1d9743ccbf81133e4a03ebf0310d180947ef2 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 15 Mar 2023 20:12:14 +0900 Subject: [PATCH 12/36] rpc_util: introduce simple pool implementation for new user --- benchmark/benchmain/main.go | 34 ++-------------------------------- dialoptions.go | 3 +++ server.go | 3 +++ shared_buffer_pool.go | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 41 insertions(+), 32 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 611a5a1a9a37..6bf2c0ba2a0a 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -331,8 +331,8 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) { case sharedRecvBufferPoolNil: // Do nothing. case sharedRecvBufferPoolSimple: - opts = append(opts, grpc.WithSharedRecvBufferPool(newSimpleSharedRecvBufferPool())) - sopts = append(sopts, grpc.SharedRecvBufferPool(newSimpleSharedRecvBufferPool())) + opts = append(opts, grpc.WithSharedRecvBufferPool(grpc.NewSimpleSharedBufferPool())) + sopts = append(sopts, grpc.SharedRecvBufferPool(grpc.NewSimpleSharedBufferPool())) default: logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.SharedRecvBufferPool) } @@ -915,33 +915,3 @@ type nopDecompressor struct{} func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return io.ReadAll(r) } func (nopDecompressor) Type() string { return compModeNop } - -// simpleSharedRecvBufferPool is a simple implementation of sharedRecvBufferPool. -type simpleSharedRecvBufferPool struct { - sync.Pool -} - -func newSimpleSharedRecvBufferPool() *simpleSharedRecvBufferPool { - return &simpleSharedRecvBufferPool{ - Pool: sync.Pool{ - New: func() interface{} { - bs := make([]byte, 0) - return &bs - }, - }, - } -} - -func (p *simpleSharedRecvBufferPool) Get(size int) []byte { - bs := p.Pool.Get().(*[]byte) - if cap(*bs) < size { - *bs = make([]byte, size) - return *bs - } - - return (*bs)[:size] -} - -func (p *simpleSharedRecvBufferPool) Put(bs *[]byte) { - p.Pool.Put(bs) -} diff --git a/dialoptions.go b/dialoptions.go index 981cbfc6120c..a01aa3595390 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -652,6 +652,9 @@ func WithResolvers(rs ...resolver.Builder) DialOption { // to use the provided shared buffer pool for parsing incoming messages. Depending // on the application's workload, this could result in reduced memory allocation. // +// If you are unsure about how to implement a memory pool but want to utilize one, +// begin with grpc.NewSimpleSharedBufferPool. +// // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a diff --git a/server.go b/server.go index 35ea570d93a0..086c45807ae3 100644 --- a/server.go +++ b/server.go @@ -558,6 +558,9 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { // to use the provided shared buffer pool for parsing incoming messages. Depending // on the application's workload, this could result in reduced memory allocation. // +// If you are unsure about how to implement a memory pool but want to utilize one, +// begin with grpc.NewSimpleSharedBufferPool. +// // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index af61e62f632d..2d58b1eff516 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -1,7 +1,40 @@ package grpc +import "sync" + // SharedBufferPool is a pool of buffers that can be shared. type SharedBufferPool interface { Get(length int) []byte Put(*[]byte) } + +// NewSimpleSharedBufferPool creates a new SimpleSharedBufferPool. +func NewSimpleSharedBufferPool() SharedBufferPool { + return &simpleSharedBufferPool{ + Pool: sync.Pool{ + New: func() interface{} { + bs := make([]byte, 0) + return &bs + }, + }, + } +} + +// simpleSharedBufferPool is a simple implementation of SharedBufferPool. +type simpleSharedBufferPool struct { + sync.Pool +} + +func (p *simpleSharedBufferPool) Get(size int) []byte { + bs := p.Pool.Get().(*[]byte) + if cap(*bs) < size { + *bs = make([]byte, size) + return *bs + } + + return (*bs)[:size] +} + +func (p *simpleSharedBufferPool) Put(bs *[]byte) { + p.Pool.Put(bs) +} From 1b11bba548a9fc362c14bdb4844d25e3971183a8 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 15 Mar 2023 20:24:35 +0900 Subject: [PATCH 13/36] rpc_util: add a copyright --- shared_buffer_pool.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 2d58b1eff516..61ef993e368d 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -1,3 +1,21 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package grpc import "sync" From a44c2005e3ada9f57680c1ac8ae8678e1963f253 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Thu, 16 Mar 2023 05:04:13 +0900 Subject: [PATCH 14/36] rpc_util: set the default value of the sharedRecvBufferPool to nil --- benchmark/benchmain/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 6bf2c0ba2a0a..b723b8e53d22 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -109,7 +109,7 @@ var ( clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list") serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list") serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list") - sharedRecvBufferPool = flags.StringWithAllowedValues("sharedRecvBufferPool", sharedRecvBufferPoolAll, "Configures the shared receive buffer pool. One of: nil, simple", allSharedRecvBufferPools) + sharedRecvBufferPool = flags.StringWithAllowedValues("sharedRecvBufferPool", sharedRecvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allSharedRecvBufferPools) logger = grpclog.Component("benchmark") ) From 6b5000f634b92d5c745f517db92fcaed20c16ddc Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Thu, 16 Mar 2023 05:23:27 +0900 Subject: [PATCH 15/36] rpc_util: add experimental notices to SharedBufferPool --- shared_buffer_pool.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 61ef993e368d..dacc8a311a98 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -21,12 +21,22 @@ package grpc import "sync" // SharedBufferPool is a pool of buffers that can be shared. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. type SharedBufferPool interface { Get(length int) []byte Put(*[]byte) } // NewSimpleSharedBufferPool creates a new SimpleSharedBufferPool. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. func NewSimpleSharedBufferPool() SharedBufferPool { return &simpleSharedBufferPool{ Pool: sync.Pool{ From c34da60d9f4432904743212fe385af436c38e02f Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Thu, 16 Mar 2023 05:32:21 +0900 Subject: [PATCH 16/36] rpc_util: provide more detail in SharedBufferPool comment --- shared_buffer_pool.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index dacc8a311a98..4f4a9c14492c 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -20,7 +20,9 @@ package grpc import "sync" -// SharedBufferPool is a pool of buffers that can be shared. +// SharedBufferPool is a pool of buffers that can be shared, resulting in +// decreased memory allocation. Currently, in gRPC-go, it is only utilized +// for parsing incoming messages. // // # Experimental // From acec62662063acfd13ec30e74ebec877efadd777 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Thu, 16 Mar 2023 05:51:05 +0900 Subject: [PATCH 17/36] rpc_util: use multiple buckets for the simpleSharedBufferPool --- shared_buffer_pool.go | 81 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 72 insertions(+), 9 deletions(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 4f4a9c14492c..d27a0eb701f2 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -33,7 +33,9 @@ type SharedBufferPool interface { Put(*[]byte) } -// NewSimpleSharedBufferPool creates a new SimpleSharedBufferPool. +// NewSimpleSharedBufferPool creates a new SimpleSharedBufferPool with buckets +// of different sizes to optimize memory usage. This prevents the pool from +// wasting large amounts of memory, even when handling messages of varying sizes. // // # Experimental // @@ -41,21 +43,72 @@ type SharedBufferPool interface { // later release. func NewSimpleSharedBufferPool() SharedBufferPool { return &simpleSharedBufferPool{ - Pool: sync.Pool{ - New: func() interface{} { - bs := make([]byte, 0) - return &bs - }, - }, + pool0: makeBytesPool(), + pool1: makeBytesPool(), + pool2: makeBytesPool(), + pool3: makeBytesPool(), + pool4: makeBytesPool(), + poolMax: makeBytesPool(), } } // simpleSharedBufferPool is a simple implementation of SharedBufferPool. type simpleSharedBufferPool struct { - sync.Pool + pool0 bufferPool + pool1 bufferPool + pool2 bufferPool + pool3 bufferPool + pool4 bufferPool + poolMax bufferPool } func (p *simpleSharedBufferPool) Get(size int) []byte { + switch { + case size <= level0PoolMaxSize: + return p.pool0.Get(size) + case size <= level1PoolMaxSize: + return p.pool1.Get(size) + case size <= level2PoolMaxSize: + return p.pool2.Get(size) + case size <= level3PoolMaxSize: + return p.pool3.Get(size) + case size <= level4PoolMaxSize: + return p.pool4.Get(size) + default: + return p.poolMax.Get(size) + } +} + +func (p *simpleSharedBufferPool) Put(bs *[]byte) { + switch size := cap(*bs); { + case size <= level0PoolMaxSize: + p.pool0.Put(bs) + case size <= level1PoolMaxSize: + p.pool1.Put(bs) + case size <= level2PoolMaxSize: + p.pool2.Put(bs) + case size <= level3PoolMaxSize: + p.pool3.Put(bs) + case size <= level4PoolMaxSize: + p.pool4.Put(bs) + default: + p.poolMax.Put(bs) + } +} + +const ( + level0PoolMaxSize = 16 + level1PoolMaxSize = level0PoolMaxSize * 16 + level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB + level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB + level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB +) + +type bufferPool struct { + sync.Pool +} + +func (p *bufferPool) Get(size int) []byte { bs := p.Pool.Get().(*[]byte) if cap(*bs) < size { *bs = make([]byte, size) @@ -65,6 +118,16 @@ func (p *simpleSharedBufferPool) Get(size int) []byte { return (*bs)[:size] } -func (p *simpleSharedBufferPool) Put(bs *[]byte) { +func (p *bufferPool) Put(bs *[]byte) { p.Pool.Put(bs) } + +func makeBytesPool() bufferPool { + return bufferPool{ + sync.Pool{ + New: func() interface{} { + return new([]byte) + }, + }, + } +} From 3cbb6b5df536f944b53c9bec30abbacefe54b1d9 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 18 Mar 2023 07:07:01 +0900 Subject: [PATCH 18/36] rpc_util: add pool size comments --- shared_buffer_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index d27a0eb701f2..86727385c7e1 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -97,8 +97,8 @@ func (p *simpleSharedBufferPool) Put(bs *[]byte) { } const ( - level0PoolMaxSize = 16 - level1PoolMaxSize = level0PoolMaxSize * 16 + level0PoolMaxSize = 16 // 16 B + level1PoolMaxSize = level0PoolMaxSize * 16 // 256 B level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB From 57b9c67038c51d0e9df37d9932107e99ae28628c Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 18 Mar 2023 07:15:08 +0900 Subject: [PATCH 19/36] rpc_util: add a SharedBufferPool function comments --- shared_buffer_pool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 86727385c7e1..873d91b6da96 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -29,7 +29,10 @@ import "sync" // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. type SharedBufferPool interface { + // Get returns a buffer with specified length from the pool. Get(length int) []byte + + // Put returns a buffer to the pool. Put(*[]byte) } From 76caf7405ba2e91dd7b4e9d31304b63f1d199d7f Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 18 Mar 2023 07:25:09 +0900 Subject: [PATCH 20/36] rpc_util: use preconfigured size buffers for each pool --- shared_buffer_pool.go | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 873d91b6da96..4a004b2c18f5 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -46,12 +46,12 @@ type SharedBufferPool interface { // later release. func NewSimpleSharedBufferPool() SharedBufferPool { return &simpleSharedBufferPool{ - pool0: makeBytesPool(), - pool1: makeBytesPool(), - pool2: makeBytesPool(), - pool3: makeBytesPool(), - pool4: makeBytesPool(), - poolMax: makeBytesPool(), + pool0: makeBytesPool(level0PoolMaxSize), + pool1: makeBytesPool(level1PoolMaxSize), + pool2: makeBytesPool(level2PoolMaxSize), + pool3: makeBytesPool(level3PoolMaxSize), + pool4: makeBytesPool(level4PoolMaxSize), + poolMax: makeFallbackBytesPool(), } } @@ -113,6 +113,31 @@ type bufferPool struct { func (p *bufferPool) Get(size int) []byte { bs := p.Pool.Get().(*[]byte) + + return (*bs)[:size] +} + +func (p *bufferPool) Put(bs *[]byte) { + p.Pool.Put(bs) +} + +func makeBytesPool(size int) bufferPool { + return bufferPool{ + sync.Pool{ + New: func() interface{} { + bs := make([]byte, size) + return &bs + }, + }, + } +} + +type fallbackBufferPool struct { + sync.Pool +} + +func (p *fallbackBufferPool) Get(size int) []byte { + bs := p.Pool.Get().(*[]byte) if cap(*bs) < size { *bs = make([]byte, size) return *bs @@ -121,11 +146,11 @@ func (p *bufferPool) Get(size int) []byte { return (*bs)[:size] } -func (p *bufferPool) Put(bs *[]byte) { +func (p *fallbackBufferPool) Put(bs *[]byte) { p.Pool.Put(bs) } -func makeBytesPool() bufferPool { +func makeFallbackBytesPool() bufferPool { return bufferPool{ sync.Pool{ New: func() interface{} { From 8f33b9bb821eaea4e5881861170005d0830b861a Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 22 Mar 2023 14:20:39 +0900 Subject: [PATCH 21/36] rpc_util: remove duplicate Put function --- shared_buffer_pool.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 4a004b2c18f5..87edbceb47b8 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -117,10 +117,6 @@ func (p *bufferPool) Get(size int) []byte { return (*bs)[:size] } -func (p *bufferPool) Put(bs *[]byte) { - p.Pool.Put(bs) -} - func makeBytesPool(size int) bufferPool { return bufferPool{ sync.Pool{ @@ -146,10 +142,6 @@ func (p *fallbackBufferPool) Get(size int) []byte { return (*bs)[:size] } -func (p *fallbackBufferPool) Put(bs *[]byte) { - p.Pool.Put(bs) -} - func makeFallbackBytesPool() bufferPool { return bufferPool{ sync.Pool{ From 515556682270ce11a49bea548a2d31ca7eccdbe6 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sun, 9 Apr 2023 15:04:25 +0900 Subject: [PATCH 22/36] rpc_util: add default recv buffer pool with nop functionality This change eliminates the need for nil checks on the shared recv buffer pool, resulting in a more simplified codebase. --- dialoptions.go | 1 + rpc_util.go | 18 +++--------------- rpc_util_test.go | 4 ++-- server.go | 1 + shared_buffer_pool.go | 12 ++++++++++++ 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/dialoptions.go b/dialoptions.go index a01aa3595390..34ba84518677 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -619,6 +619,7 @@ func defaultDialOptions() dialOptions { ReadBufferSize: defaultReadBufSize, UseProxy: true, }, + sharedRecvBufferPool: nopBufferPool{}, } } diff --git a/rpc_util.go b/rpc_util.go index 756a4151ae1d..e66bbe06daf1 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -613,11 +613,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - if p.sharedRecvBufferPool != nil { - msg = p.sharedRecvBufferPool.Get(int(length)) - } else { - msg = make([]byte, int(length)) - } + msg = p.sharedRecvBufferPool.Get(int(length)) if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF @@ -805,16 +801,8 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err) } if payInfo != nil { - if p.sharedRecvBufferPool != nil { - if len(buf) != 0 { - payInfo.uncompressedBytes = make([]byte, len(buf)) - copy(payInfo.uncompressedBytes, buf) - } - } else { - payInfo.uncompressedBytes = buf - } - } - if p.sharedRecvBufferPool != nil { + payInfo.uncompressedBytes = buf + } else { p.sharedRecvBufferPool.Put(&buf) } return nil diff --git a/rpc_util_test.go b/rpc_util_test.go index 90912d52a226..d9d4b6fa53fc 100644 --- a/rpc_util_test.go +++ b/rpc_util_test.go @@ -65,7 +65,7 @@ func (s) TestSimpleParsing(t *testing.T) { {append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone}, } { buf := fullReader{bytes.NewReader(test.p)} - parser := &parser{r: buf} + parser := &parser{r: buf, sharedRecvBufferPool: nopBufferPool{}} pt, b, err := parser.recvMsg(math.MaxInt32) if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt { t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err) @@ -77,7 +77,7 @@ func (s) TestMultipleParsing(t *testing.T) { // Set a byte stream consists of 3 messages with their headers. p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'} b := fullReader{bytes.NewReader(p)} - parser := &parser{r: b} + parser := &parser{r: b, sharedRecvBufferPool: nopBufferPool{}} wantRecvs := []struct { pt payloadFormat diff --git a/server.go b/server.go index 086c45807ae3..75199bc26afb 100644 --- a/server.go +++ b/server.go @@ -184,6 +184,7 @@ var defaultServerOptions = serverOptions{ connectionTimeout: 120 * time.Second, writeBufferSize: defaultWriteBufSize, readBufferSize: defaultReadBufSize, + sharedRecvBufferPool: nopBufferPool{}, } var globalServerOptions []ServerOption diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 87edbceb47b8..cff0c0a8dd85 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -151,3 +151,15 @@ func makeFallbackBytesPool() bufferPool { }, } } + +// nopBufferPool is a buffer pool just makes new buffer without pooling. +type nopBufferPool struct { +} + +func (nopBufferPool) Get(length int) []byte { + return make([]byte, length) +} + +func (nopBufferPool) Put(*[]byte) { + +} From 15f820e51f3622eb0a79185fd985d117f4f1c044 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sun, 9 Apr 2023 15:12:13 +0900 Subject: [PATCH 23/36] rpc_util: use array for the simple shared buffer pool --- shared_buffer_pool.go | 64 +++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index cff0c0a8dd85..a1c1436d06a4 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -46,56 +46,44 @@ type SharedBufferPool interface { // later release. func NewSimpleSharedBufferPool() SharedBufferPool { return &simpleSharedBufferPool{ - pool0: makeBytesPool(level0PoolMaxSize), - pool1: makeBytesPool(level1PoolMaxSize), - pool2: makeBytesPool(level2PoolMaxSize), - pool3: makeBytesPool(level3PoolMaxSize), - pool4: makeBytesPool(level4PoolMaxSize), - poolMax: makeFallbackBytesPool(), + pools: [poolArraySize]bufferPool{ + makeBytesPool(level0PoolMaxSize), + makeBytesPool(level1PoolMaxSize), + makeBytesPool(level2PoolMaxSize), + makeBytesPool(level3PoolMaxSize), + makeBytesPool(level4PoolMaxSize), + makeFallbackBytesPool(), + }, } } // simpleSharedBufferPool is a simple implementation of SharedBufferPool. type simpleSharedBufferPool struct { - pool0 bufferPool - pool1 bufferPool - pool2 bufferPool - pool3 bufferPool - pool4 bufferPool - poolMax bufferPool + pools [poolArraySize]bufferPool } func (p *simpleSharedBufferPool) Get(size int) []byte { - switch { - case size <= level0PoolMaxSize: - return p.pool0.Get(size) - case size <= level1PoolMaxSize: - return p.pool1.Get(size) - case size <= level2PoolMaxSize: - return p.pool2.Get(size) - case size <= level3PoolMaxSize: - return p.pool3.Get(size) - case size <= level4PoolMaxSize: - return p.pool4.Get(size) - default: - return p.poolMax.Get(size) - } + return p.pools[p.poolIdx(size)].Get(size) } func (p *simpleSharedBufferPool) Put(bs *[]byte) { - switch size := cap(*bs); { + p.pools[p.poolIdx(cap(*bs))].Put(bs) +} + +func (p *simpleSharedBufferPool) poolIdx(size int) int { + switch { case size <= level0PoolMaxSize: - p.pool0.Put(bs) + return level0PoolIdx case size <= level1PoolMaxSize: - p.pool1.Put(bs) + return level1PoolIdx case size <= level2PoolMaxSize: - p.pool2.Put(bs) + return level2PoolIdx case size <= level3PoolMaxSize: - p.pool3.Put(bs) + return level3PoolIdx case size <= level4PoolMaxSize: - p.pool4.Put(bs) + return level4PoolIdx default: - p.poolMax.Put(bs) + return levelMaxPoolIdx } } @@ -107,6 +95,16 @@ const ( level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB ) +const ( + level0PoolIdx = iota + level1PoolIdx + level2PoolIdx + level3PoolIdx + level4PoolIdx + levelMaxPoolIdx + poolArraySize +) + type bufferPool struct { sync.Pool } From 056b3e5931f55e95a4d19bc1a1ce83e6fb7bc4bb Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sun, 9 Apr 2023 16:12:45 +0900 Subject: [PATCH 24/36] rpc_util: use fallback bytes pool for the simple shared buffer pool --- shared_buffer_pool.go | 29 +++++++++++--------- shared_buffer_pool_test.go | 54 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 12 deletions(-) create mode 100644 shared_buffer_pool_test.go diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index a1c1436d06a4..f1d41e6395a0 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -46,20 +46,20 @@ type SharedBufferPool interface { // later release. func NewSimpleSharedBufferPool() SharedBufferPool { return &simpleSharedBufferPool{ - pools: [poolArraySize]bufferPool{ - makeBytesPool(level0PoolMaxSize), - makeBytesPool(level1PoolMaxSize), - makeBytesPool(level2PoolMaxSize), - makeBytesPool(level3PoolMaxSize), - makeBytesPool(level4PoolMaxSize), - makeFallbackBytesPool(), + pools: [poolArraySize]simpleSharedBufferChildPool{ + newBytesPool(level0PoolMaxSize), + newBytesPool(level1PoolMaxSize), + newBytesPool(level2PoolMaxSize), + newBytesPool(level3PoolMaxSize), + newBytesPool(level4PoolMaxSize), + newFallbackBytesPool(), }, } } // simpleSharedBufferPool is a simple implementation of SharedBufferPool. type simpleSharedBufferPool struct { - pools [poolArraySize]bufferPool + pools [poolArraySize]simpleSharedBufferChildPool } func (p *simpleSharedBufferPool) Get(size int) []byte { @@ -105,6 +105,11 @@ const ( poolArraySize ) +type simpleSharedBufferChildPool interface { + Get(size int) []byte + Put(interface{}) +} + type bufferPool struct { sync.Pool } @@ -115,8 +120,8 @@ func (p *bufferPool) Get(size int) []byte { return (*bs)[:size] } -func makeBytesPool(size int) bufferPool { - return bufferPool{ +func newBytesPool(size int) simpleSharedBufferChildPool { + return &bufferPool{ sync.Pool{ New: func() interface{} { bs := make([]byte, size) @@ -140,8 +145,8 @@ func (p *fallbackBufferPool) Get(size int) []byte { return (*bs)[:size] } -func makeFallbackBytesPool() bufferPool { - return bufferPool{ +func newFallbackBytesPool() simpleSharedBufferChildPool { + return &fallbackBufferPool{ sync.Pool{ New: func() interface{} { return new([]byte) diff --git a/shared_buffer_pool_test.go b/shared_buffer_pool_test.go new file mode 100644 index 000000000000..96b0d8e47601 --- /dev/null +++ b/shared_buffer_pool_test.go @@ -0,0 +1,54 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import "testing" + +func (s) TestSharedBufferPool(t *testing.T) { + pools := []SharedBufferPool{ + nopBufferPool{}, + NewSimpleSharedBufferPool(), + } + + lengths := []int{ + level4PoolMaxSize + 1, + level4PoolMaxSize, + level3PoolMaxSize, + level2PoolMaxSize, + level1PoolMaxSize, + level0PoolMaxSize, + } + + for _, p := range pools { + for _, l := range lengths { + bs := p.Get(l) + if len(bs) != l { + t.Fatalf("Expected buffer of length %d, got %d", l, len(bs)) + } + + for i, b := range bs { + if b != 0 { + t.Fatalf("Expected buffer to be zeroed, got %d at index %d", b, i) + } + } + + p.Put(&bs) + } + } +} From 0d07cfc3dfcc1bc2c21890e7d1efc7c40b404e1e Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Tue, 11 Apr 2023 18:02:21 +0900 Subject: [PATCH 25/36] rpc_util: remove a wrong test - Our shared buffer pool does not provide initialization, but it performs better. Users must initialize data from the pool themselves if necessary. --- shared_buffer_pool.go | 2 ++ shared_buffer_pool_test.go | 6 ------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index f1d41e6395a0..f17857c1520a 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -30,6 +30,8 @@ import "sync" // later release. type SharedBufferPool interface { // Get returns a buffer with specified length from the pool. + // + // The returned byte slice may be not zero initialized. Get(length int) []byte // Put returns a buffer to the pool. diff --git a/shared_buffer_pool_test.go b/shared_buffer_pool_test.go index 96b0d8e47601..b1416f68bf33 100644 --- a/shared_buffer_pool_test.go +++ b/shared_buffer_pool_test.go @@ -42,12 +42,6 @@ func (s) TestSharedBufferPool(t *testing.T) { t.Fatalf("Expected buffer of length %d, got %d", l, len(bs)) } - for i, b := range bs { - if b != 0 { - t.Fatalf("Expected buffer to be zeroed, got %d at index %d", b, i) - } - } - p.Put(&bs) } } From ca9f6deb694da7fe6c68a71e377e71e68b372247 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 19 Apr 2023 21:47:24 +0900 Subject: [PATCH 26/36] rpc_util: add limitation comment for the shared buffer pool --- dialoptions.go | 4 ++++ server.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/dialoptions.go b/dialoptions.go index 7c312881d953..a55b4bdb39cc 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -665,6 +665,10 @@ func WithResolvers(rs ...resolver.Builder) DialOption { // If you are unsure about how to implement a memory pool but want to utilize one, // begin with grpc.NewSimpleSharedBufferPool. // +// Note: The shared buffer pool feature will not be active if any of the following +// options are used: WithStatsHandler, EnableTracing, or binary logging. In such +// cases, the shared buffer pool will be ignored. +// // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a diff --git a/server.go b/server.go index f1c71d99b958..25e981813c3a 100644 --- a/server.go +++ b/server.go @@ -561,6 +561,10 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { // If you are unsure about how to implement a memory pool but want to utilize one, // begin with grpc.NewSimpleSharedBufferPool. // +// Note: The shared buffer pool feature will not be active if any of the following +// options are used: StatsHandler, EnableTracing, or binary logging. In such +// cases, the shared buffer pool will be ignored. +// // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a From 9be78899e3d38da51d696c13f89390d267c01da1 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 19 Apr 2023 21:52:21 +0900 Subject: [PATCH 27/36] rpc_util: exclude shared prefix from shared recv buffer pool --- benchmark/benchmain/main.go | 40 ++++++++++++++++++------------------- benchmark/stats/stats.go | 14 ++++++------- dialoptions.go | 10 +++++----- rpc_util.go | 8 ++++---- rpc_util_test.go | 4 ++-- server.go | 14 ++++++------- stream.go | 4 ++-- 7 files changed, 47 insertions(+), 47 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index b723b8e53d22..1f7bb3976feb 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -109,7 +109,7 @@ var ( clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list") serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list") serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list") - sharedRecvBufferPool = flags.StringWithAllowedValues("sharedRecvBufferPool", sharedRecvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allSharedRecvBufferPools) + recvBufferPool = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools) logger = grpclog.Component("benchmark") ) @@ -135,9 +135,9 @@ const ( networkModeWAN = "WAN" networkLongHaul = "Longhaul" // Shared recv buffer pool - sharedRecvBufferPoolNil = "nil" - sharedRecvBufferPoolSimple = "simple" - sharedRecvBufferPoolAll = "all" + recvBufferPoolNil = "nil" + recvBufferPoolSimple = "simple" + recvBufferPoolAll = "all" numStatsBuckets = 10 warmupCallCount = 10 @@ -149,7 +149,7 @@ var ( allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll} allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth} allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul} - allSharedRecvBufferPools = []string{sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple, sharedRecvBufferPoolAll} + allRecvBufferPools = []string{recvBufferPoolNil, recvBufferPoolSimple, recvBufferPoolAll} defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. defaultReadKbps = []int{0, 10240} // if non-positive, infinite defaultReadMTU = []int{0} // if non-positive, infinite @@ -327,14 +327,14 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) { if bf.ServerWriteBufferSize >= 0 { sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize)) } - switch bf.SharedRecvBufferPool { - case sharedRecvBufferPoolNil: + switch bf.RecvBufferPool { + case recvBufferPoolNil: // Do nothing. - case sharedRecvBufferPoolSimple: - opts = append(opts, grpc.WithSharedRecvBufferPool(grpc.NewSimpleSharedBufferPool())) - sopts = append(sopts, grpc.SharedRecvBufferPool(grpc.NewSimpleSharedBufferPool())) + case recvBufferPoolSimple: + opts = append(opts, grpc.WithRecvBufferPool(grpc.NewSimpleSharedBufferPool())) + sopts = append(sopts, grpc.RecvBufferPool(grpc.NewSimpleSharedBufferPool())) default: - logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.SharedRecvBufferPool) + logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool) } sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1))) @@ -543,7 +543,7 @@ type featureOpts struct { clientWriteBufferSize []int serverReadBufferSize []int serverWriteBufferSize []int - sharedRecvBufferPools []string + recvBufferPools []string } // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each @@ -588,8 +588,8 @@ func makeFeaturesNum(b *benchOpts) []int { featuresNum[i] = len(b.features.serverReadBufferSize) case stats.ServerWriteBufferSize: featuresNum[i] = len(b.features.serverWriteBufferSize) - case stats.SharedRecvBufferPool: - featuresNum[i] = len(b.features.sharedRecvBufferPools) + case stats.RecvBufferPool: + featuresNum[i] = len(b.features.recvBufferPools) default: log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex) } @@ -656,7 +656,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]], ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]], ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]], - SharedRecvBufferPool: b.features.sharedRecvBufferPools[curPos[stats.SharedRecvBufferPool]], + RecvBufferPool: b.features.recvBufferPools[curPos[stats.RecvBufferPool]], } if len(b.features.reqPayloadCurves) == 0 { f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]] @@ -727,7 +727,7 @@ func processFlags() *benchOpts { clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...), serverReadBufferSize: append([]int(nil), *serverReadBufferSize...), serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...), - sharedRecvBufferPools: setSharedRecvBufferPool(*sharedRecvBufferPool), + recvBufferPools: setRecvBufferPool(*recvBufferPool), }, } @@ -803,12 +803,12 @@ func setCompressorMode(val string) []string { } } -func setSharedRecvBufferPool(val string) []string { +func setRecvBufferPool(val string) []string { switch val { - case sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple: + case recvBufferPoolNil, recvBufferPoolSimple: return []string{val} - case sharedRecvBufferPoolAll: - return []string{sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple} + case recvBufferPoolAll: + return []string{recvBufferPoolNil, recvBufferPoolSimple} default: // This should never happen because a wrong value passed to this flag would // be caught during flag.Parse(). diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index ec0cde056d44..1f254f78e978 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -56,7 +56,7 @@ const ( ClientWriteBufferSize ServerReadBufferSize ServerWriteBufferSize - SharedRecvBufferPool + RecvBufferPool // MaxFeatureIndex is a place holder to indicate the total number of feature // indices we have. Any new feature indices should be added above this. @@ -122,8 +122,8 @@ type Features struct { ServerReadBufferSize int // ServerWriteBufferSize is the size of the server write buffer in bytes. If negative, use the default buffer size. ServerWriteBufferSize int - // SharedRecvBufferPool represents the shared recv buffer pool used. - SharedRecvBufferPool string + // RecvBufferPool represents the shared recv buffer pool used. + RecvBufferPool string } // String returns all the feature values as a string. @@ -143,12 +143,12 @@ func (f Features) String() string { "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+ "compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+ "clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+ - "sharedRecvBufferPool_%v-", + "recvBufferPool_%v-", f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString, respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader, f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize, - f.ServerWriteBufferSize, f.SharedRecvBufferPool) + f.ServerWriteBufferSize, f.RecvBufferPool) } // SharedFeatures returns the shared features as a pretty printable string. @@ -220,8 +220,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim b.WriteString(fmt.Sprintf("ServerReadBufferSize%v%v%v", sep, f.ServerReadBufferSize, delim)) case ServerWriteBufferSize: b.WriteString(fmt.Sprintf("ServerWriteBufferSize%v%v%v", sep, f.ServerWriteBufferSize, delim)) - case SharedRecvBufferPool: - b.WriteString(fmt.Sprintf("SharedRecvBufferPool%v%v%v", sep, f.SharedRecvBufferPool, delim)) + case RecvBufferPool: + b.WriteString(fmt.Sprintf("RecvBufferPool%v%v%v", sep, f.RecvBufferPool, delim)) default: log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex) } diff --git a/dialoptions.go b/dialoptions.go index a55b4bdb39cc..83074758652d 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -77,7 +77,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder - sharedRecvBufferPool SharedBufferPool + recvBufferPool SharedBufferPool } // DialOption configures how we set up the connection. @@ -628,7 +628,7 @@ func defaultDialOptions() dialOptions { ReadBufferSize: defaultReadBufSize, UseProxy: true, }, - sharedRecvBufferPool: nopBufferPool{}, + recvBufferPool: nopBufferPool{}, } } @@ -658,7 +658,7 @@ func WithResolvers(rs ...resolver.Builder) DialOption { }) } -// WithSharedRecvBufferPool returns a DialOption that configures the ClientConn +// WithRecvBufferPool returns a DialOption that configures the ClientConn // to use the provided shared buffer pool for parsing incoming messages. Depending // on the application's workload, this could result in reduced memory allocation. // @@ -673,8 +673,8 @@ func WithResolvers(rs ...resolver.Builder) DialOption { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func WithSharedRecvBufferPool(bufferPool SharedBufferPool) DialOption { +func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption { return newFuncDialOption(func(o *dialOptions) { - o.sharedRecvBufferPool = bufferPool + o.recvBufferPool = bufferPool }) } diff --git a/rpc_util.go b/rpc_util.go index e66bbe06daf1..a844d28f49d0 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -578,8 +578,8 @@ type parser struct { // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte - // sharedRecvBufferPool is the pool of shared receive buffers. - sharedRecvBufferPool SharedBufferPool + // recvBufferPool is the pool of shared receive buffers. + recvBufferPool SharedBufferPool } // recvMsg reads a complete gRPC message from the stream. @@ -613,7 +613,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - msg = p.sharedRecvBufferPool.Get(int(length)) + msg = p.recvBufferPool.Get(int(length)) if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF @@ -803,7 +803,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf if payInfo != nil { payInfo.uncompressedBytes = buf } else { - p.sharedRecvBufferPool.Put(&buf) + p.recvBufferPool.Put(&buf) } return nil } diff --git a/rpc_util_test.go b/rpc_util_test.go index d9d4b6fa53fc..84f2348655b9 100644 --- a/rpc_util_test.go +++ b/rpc_util_test.go @@ -65,7 +65,7 @@ func (s) TestSimpleParsing(t *testing.T) { {append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone}, } { buf := fullReader{bytes.NewReader(test.p)} - parser := &parser{r: buf, sharedRecvBufferPool: nopBufferPool{}} + parser := &parser{r: buf, recvBufferPool: nopBufferPool{}} pt, b, err := parser.recvMsg(math.MaxInt32) if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt { t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err) @@ -77,7 +77,7 @@ func (s) TestMultipleParsing(t *testing.T) { // Set a byte stream consists of 3 messages with their headers. p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'} b := fullReader{bytes.NewReader(p)} - parser := &parser{r: b, sharedRecvBufferPool: nopBufferPool{}} + parser := &parser{r: b, recvBufferPool: nopBufferPool{}} wantRecvs := []struct { pt payloadFormat diff --git a/server.go b/server.go index 25e981813c3a..7c8e83622593 100644 --- a/server.go +++ b/server.go @@ -174,7 +174,7 @@ type serverOptions struct { maxHeaderListSize *uint32 headerTableSize *uint32 numServerWorkers uint32 - sharedRecvBufferPool SharedBufferPool + recvBufferPool SharedBufferPool } var defaultServerOptions = serverOptions{ @@ -183,7 +183,7 @@ var defaultServerOptions = serverOptions{ connectionTimeout: 120 * time.Second, writeBufferSize: defaultWriteBufSize, readBufferSize: defaultReadBufSize, - sharedRecvBufferPool: nopBufferPool{}, + recvBufferPool: nopBufferPool{}, } var globalServerOptions []ServerOption @@ -554,7 +554,7 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { }) } -// SharedRecvBufferPool returns a ServerOption that configures the server +// RecvBufferPool returns a ServerOption that configures the server // to use the provided shared buffer pool for parsing incoming messages. Depending // on the application's workload, this could result in reduced memory allocation. // @@ -569,9 +569,9 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func SharedRecvBufferPool(bufferPool SharedBufferPool) ServerOption { +func RecvBufferPool(bufferPool SharedBufferPool) ServerOption { return newFuncServerOption(func(o *serverOptions) { - o.sharedRecvBufferPool = bufferPool + o.recvBufferPool = bufferPool }) } @@ -1319,7 +1319,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if len(shs) != 0 || len(binlogs) != 0 { payInfo = &payloadInfo{} } - d, err := recvAndDecompress(&parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) + d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) if err != nil { if e := t.WriteStatus(stream, status.Convert(err)); e != nil { channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) @@ -1529,7 +1529,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ctx: ctx, t: t, s: stream, - p: &parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool}, + p: &parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, codec: s.getCodec(stream.ContentSubtype()), maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize, diff --git a/stream.go b/stream.go index 80c46cf6e76e..8b4d492323b8 100644 --- a/stream.go +++ b/stream.go @@ -499,7 +499,7 @@ func (a *csAttempt) newStream() error { return toRPCErr(nse.Err) } a.s = s - a.p = &parser{r: s, sharedRecvBufferPool: a.cs.cc.dopts.sharedRecvBufferPool} + a.p = &parser{r: s, recvBufferPool: a.cs.cc.dopts.recvBufferPool} return nil } @@ -1262,7 +1262,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin return nil, err } as.s = s - as.p = &parser{r: s, sharedRecvBufferPool: ac.dopts.sharedRecvBufferPool} + as.p = &parser{r: s, recvBufferPool: ac.dopts.recvBufferPool} ac.incrCallsStarted() if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the From 25b60e39d0d54fcaebfe2389b517826c22ebaade Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 19 Apr 2023 21:55:07 +0900 Subject: [PATCH 28/36] rpc_util: rename NewSimpleSharedBufferPool to NewsimpleSharedBufferPool --- benchmark/benchmain/main.go | 4 ++-- dialoptions.go | 2 +- server.go | 2 +- shared_buffer_pool.go | 4 ++-- shared_buffer_pool_test.go | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 1f7bb3976feb..33e67ff84eed 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -331,8 +331,8 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) { case recvBufferPoolNil: // Do nothing. case recvBufferPoolSimple: - opts = append(opts, grpc.WithRecvBufferPool(grpc.NewSimpleSharedBufferPool())) - sopts = append(sopts, grpc.RecvBufferPool(grpc.NewSimpleSharedBufferPool())) + opts = append(opts, grpc.WithRecvBufferPool(grpc.NewsimpleSharedBufferPool())) + sopts = append(sopts, grpc.RecvBufferPool(grpc.NewsimpleSharedBufferPool())) default: logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool) } diff --git a/dialoptions.go b/dialoptions.go index 83074758652d..48c1f07f54e5 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -663,7 +663,7 @@ func WithResolvers(rs ...resolver.Builder) DialOption { // on the application's workload, this could result in reduced memory allocation. // // If you are unsure about how to implement a memory pool but want to utilize one, -// begin with grpc.NewSimpleSharedBufferPool. +// begin with grpc.NewsimpleSharedBufferPool. // // Note: The shared buffer pool feature will not be active if any of the following // options are used: WithStatsHandler, EnableTracing, or binary logging. In such diff --git a/server.go b/server.go index 7c8e83622593..3adbfa22f247 100644 --- a/server.go +++ b/server.go @@ -559,7 +559,7 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { // on the application's workload, this could result in reduced memory allocation. // // If you are unsure about how to implement a memory pool but want to utilize one, -// begin with grpc.NewSimpleSharedBufferPool. +// begin with grpc.NewsimpleSharedBufferPool. // // Note: The shared buffer pool feature will not be active if any of the following // options are used: StatsHandler, EnableTracing, or binary logging. In such diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index f17857c1520a..7995d8ee7ddf 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -38,7 +38,7 @@ type SharedBufferPool interface { Put(*[]byte) } -// NewSimpleSharedBufferPool creates a new SimpleSharedBufferPool with buckets +// NewsimpleSharedBufferPool creates a new simpleSharedBufferPool with buckets // of different sizes to optimize memory usage. This prevents the pool from // wasting large amounts of memory, even when handling messages of varying sizes. // @@ -46,7 +46,7 @@ type SharedBufferPool interface { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func NewSimpleSharedBufferPool() SharedBufferPool { +func NewsimpleSharedBufferPool() SharedBufferPool { return &simpleSharedBufferPool{ pools: [poolArraySize]simpleSharedBufferChildPool{ newBytesPool(level0PoolMaxSize), diff --git a/shared_buffer_pool_test.go b/shared_buffer_pool_test.go index b1416f68bf33..b7960179e145 100644 --- a/shared_buffer_pool_test.go +++ b/shared_buffer_pool_test.go @@ -23,7 +23,7 @@ import "testing" func (s) TestSharedBufferPool(t *testing.T) { pools := []SharedBufferPool{ nopBufferPool{}, - NewSimpleSharedBufferPool(), + NewsimpleSharedBufferPool(), } lengths := []int{ From 8e8f683ac72b7b74e0bbcee2b6e14fce116dab65 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 19 Apr 2023 23:26:52 +0900 Subject: [PATCH 29/36] rpc_util: add a TestRecvBufferPool --- test/recv_buffer_pool_test.go | 90 +++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 test/recv_buffer_pool_test.go diff --git a/test/recv_buffer_pool_test.go b/test/recv_buffer_pool_test.go new file mode 100644 index 000000000000..2289fef4358b --- /dev/null +++ b/test/recv_buffer_pool_test.go @@ -0,0 +1,90 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/internal/stubserver" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +func (s) TestRecvBufferPool(t *testing.T) { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + for i := 0; i < 10; i++ { + preparedMsg := &grpc.PreparedMsg{} + err := preparedMsg.Encode(stream, &testpb.StreamingOutputCallResponse{ + Payload: &testpb.Payload{ + Body: []byte{'0' + uint8(i)}, + }, + }) + if err != nil { + return err + } + stream.SendMsg(preparedMsg) + } + return nil + }, + } + if err := ss.Start( + []grpc.ServerOption{grpc.RecvBufferPool(grpc.NewsimpleSharedBufferPool())}, + grpc.WithRecvBufferPool(grpc.NewsimpleSharedBufferPool()), + ); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + var ngot int + var buf bytes.Buffer + for { + reply, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + ngot++ + if buf.Len() > 0 { + buf.WriteByte(',') + } + buf.Write(reply.GetPayload().GetBody()) + } + if want := 10; ngot != want { + t.Errorf("Got %d replies, want %d", ngot, want) + } + if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want { + t.Errorf("Got replies %q; want %q", got, want) + } +} From 96f5a276daf93ec80162d7e58903abef081a1c72 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 20 May 2023 14:53:48 +0900 Subject: [PATCH 30/36] rpc_util: rename NewsimpleSharedBufferPool to NewSharedBufferPool --- benchmark/benchmain/main.go | 4 ++-- dialoptions.go | 2 +- server.go | 2 +- shared_buffer_pool.go | 8 ++++---- shared_buffer_pool_test.go | 2 +- test/recv_buffer_pool_test.go | 4 ++-- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 33e67ff84eed..516514c276a1 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -331,8 +331,8 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) { case recvBufferPoolNil: // Do nothing. case recvBufferPoolSimple: - opts = append(opts, grpc.WithRecvBufferPool(grpc.NewsimpleSharedBufferPool())) - sopts = append(sopts, grpc.RecvBufferPool(grpc.NewsimpleSharedBufferPool())) + opts = append(opts, grpc.WithRecvBufferPool(grpc.NewSharedBufferPool())) + sopts = append(sopts, grpc.RecvBufferPool(grpc.NewSharedBufferPool())) default: logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool) } diff --git a/dialoptions.go b/dialoptions.go index 48c1f07f54e5..4314452996bf 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -663,7 +663,7 @@ func WithResolvers(rs ...resolver.Builder) DialOption { // on the application's workload, this could result in reduced memory allocation. // // If you are unsure about how to implement a memory pool but want to utilize one, -// begin with grpc.NewsimpleSharedBufferPool. +// begin with grpc.NewSharedBufferPool. // // Note: The shared buffer pool feature will not be active if any of the following // options are used: WithStatsHandler, EnableTracing, or binary logging. In such diff --git a/server.go b/server.go index 3adbfa22f247..a25f25606552 100644 --- a/server.go +++ b/server.go @@ -559,7 +559,7 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { // on the application's workload, this could result in reduced memory allocation. // // If you are unsure about how to implement a memory pool but want to utilize one, -// begin with grpc.NewsimpleSharedBufferPool. +// begin with grpc.NewSharedBufferPool. // // Note: The shared buffer pool feature will not be active if any of the following // options are used: StatsHandler, EnableTracing, or binary logging. In such diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 7995d8ee7ddf..386cbc3119cf 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -38,7 +38,7 @@ type SharedBufferPool interface { Put(*[]byte) } -// NewsimpleSharedBufferPool creates a new simpleSharedBufferPool with buckets +// NewSharedBufferPool creates a simple SharedBufferPool with buckets // of different sizes to optimize memory usage. This prevents the pool from // wasting large amounts of memory, even when handling messages of varying sizes. // @@ -46,7 +46,7 @@ type SharedBufferPool interface { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func NewsimpleSharedBufferPool() SharedBufferPool { +func NewSharedBufferPool() SharedBufferPool { return &simpleSharedBufferPool{ pools: [poolArraySize]simpleSharedBufferChildPool{ newBytesPool(level0PoolMaxSize), @@ -139,8 +139,8 @@ type fallbackBufferPool struct { func (p *fallbackBufferPool) Get(size int) []byte { bs := p.Pool.Get().(*[]byte) - if cap(*bs) < size { - *bs = make([]byte, size) + if cap(*bs) < size { + *bs = make([]byte, size) return *bs } diff --git a/shared_buffer_pool_test.go b/shared_buffer_pool_test.go index b7960179e145..f5ed7c8314f1 100644 --- a/shared_buffer_pool_test.go +++ b/shared_buffer_pool_test.go @@ -23,7 +23,7 @@ import "testing" func (s) TestSharedBufferPool(t *testing.T) { pools := []SharedBufferPool{ nopBufferPool{}, - NewsimpleSharedBufferPool(), + NewSharedBufferPool(), } lengths := []int{ diff --git a/test/recv_buffer_pool_test.go b/test/recv_buffer_pool_test.go index 2289fef4358b..8bb6db4a77af 100644 --- a/test/recv_buffer_pool_test.go +++ b/test/recv_buffer_pool_test.go @@ -50,8 +50,8 @@ func (s) TestRecvBufferPool(t *testing.T) { }, } if err := ss.Start( - []grpc.ServerOption{grpc.RecvBufferPool(grpc.NewsimpleSharedBufferPool())}, - grpc.WithRecvBufferPool(grpc.NewsimpleSharedBufferPool()), + []grpc.ServerOption{grpc.RecvBufferPool(grpc.NewSharedBufferPool())}, + grpc.WithRecvBufferPool(grpc.NewSharedBufferPool()), ); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } From fe1294f584874f68b58cd889c3013fc784acd995 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Sat, 20 May 2023 15:19:57 +0900 Subject: [PATCH 31/36] rpc_util: merge fallback bytespool functionality into bytepool --- shared_buffer_pool.go | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 386cbc3119cf..f246a7b949b3 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -54,7 +54,7 @@ func NewSharedBufferPool() SharedBufferPool { newBytesPool(level2PoolMaxSize), newBytesPool(level3PoolMaxSize), newBytesPool(level4PoolMaxSize), - newFallbackBytesPool(), + newBytesPool(0), }, } } @@ -114,46 +114,33 @@ type simpleSharedBufferChildPool interface { type bufferPool struct { sync.Pool + + defaultSize int } func (p *bufferPool) Get(size int) []byte { bs := p.Pool.Get().(*[]byte) - return (*bs)[:size] -} - -func newBytesPool(size int) simpleSharedBufferChildPool { - return &bufferPool{ - sync.Pool{ - New: func() interface{} { - bs := make([]byte, size) - return &bs - }, - }, - } -} - -type fallbackBufferPool struct { - sync.Pool -} - -func (p *fallbackBufferPool) Get(size int) []byte { - bs := p.Pool.Get().(*[]byte) + // If default size is 0. It means this pool is fallback pool. + // Therefore, we need to make a new one if the requested size is larger than the buffer. + if p.defaultSize == 0 { if cap(*bs) < size { *bs = make([]byte, size) - return *bs + } } return (*bs)[:size] } -func newFallbackBytesPool() simpleSharedBufferChildPool { - return &fallbackBufferPool{ - sync.Pool{ +func newBytesPool(size int) simpleSharedBufferChildPool { + return &bufferPool{ + Pool: sync.Pool{ New: func() interface{} { - return new([]byte) + bs := make([]byte, size) + return &bs }, }, + defaultSize: size, } } From 55354169ca42b4f3abbbf78debe9a89c4cd0ac1f Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 24 May 2023 08:05:23 +0900 Subject: [PATCH 32/36] rpc_util: add capacity checks for all buffer pool sizes - It's not required, as it's guaranteed outside the pool, but it seems safer. --- shared_buffer_pool.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index f246a7b949b3..317244d8181b 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -121,12 +121,8 @@ type bufferPool struct { func (p *bufferPool) Get(size int) []byte { bs := p.Pool.Get().(*[]byte) - // If default size is 0. It means this pool is fallback pool. - // Therefore, we need to make a new one if the requested size is larger than the buffer. - if p.defaultSize == 0 { - if cap(*bs) < size { - *bs = make([]byte, size) - } + if cap(*bs) < size { + *bs = make([]byte, size) } return (*bs)[:size] From 2678efb1a7844ea090938abdb1d3e0f7288f43b4 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Wed, 24 May 2023 08:21:29 +0900 Subject: [PATCH 33/36] rpc_util: return unused buffers to memory pool --- shared_buffer_pool.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 317244d8181b..21f9517ee13e 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -122,7 +122,9 @@ func (p *bufferPool) Get(size int) []byte { bs := p.Pool.Get().(*[]byte) if cap(*bs) < size { - *bs = make([]byte, size) + p.Pool.Put(bs) + + return make([]byte, size) } return (*bs)[:size] From 63a360ebbaf2285355f48a2a1f0aeadc6b2b6b73 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Tue, 27 Jun 2023 20:13:42 +0900 Subject: [PATCH 34/36] rpc_util: remove a useless newline --- shared_buffer_pool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 21f9517ee13e..c3a5a9ac1f19 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -151,5 +151,4 @@ func (nopBufferPool) Get(length int) []byte { } func (nopBufferPool) Put(*[]byte) { - } From 1f4bc3550a6ef2915b5c9da55279d73c209575d7 Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Tue, 27 Jun 2023 21:25:18 +0900 Subject: [PATCH 35/36] rpc_util: standardize buffer pool interface to use []byte in Put method --- rpc_util.go | 2 +- shared_buffer_pool.go | 17 ++++++++--------- shared_buffer_pool_test.go | 2 +- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/rpc_util.go b/rpc_util.go index a844d28f49d0..01e9c60b807f 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -803,7 +803,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf if payInfo != nil { payInfo.uncompressedBytes = buf } else { - p.recvBufferPool.Put(&buf) + p.recvBufferPool.Put(buf) } return nil } diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index c3a5a9ac1f19..97af1857c296 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -35,7 +35,7 @@ type SharedBufferPool interface { Get(length int) []byte // Put returns a buffer to the pool. - Put(*[]byte) + Put([]byte) } // NewSharedBufferPool creates a simple SharedBufferPool with buckets @@ -68,8 +68,8 @@ func (p *simpleSharedBufferPool) Get(size int) []byte { return p.pools[p.poolIdx(size)].Get(size) } -func (p *simpleSharedBufferPool) Put(bs *[]byte) { - p.pools[p.poolIdx(cap(*bs))].Put(bs) +func (p *simpleSharedBufferPool) Put(bs []byte) { + p.pools[p.poolIdx(cap(bs))].Put(bs) } func (p *simpleSharedBufferPool) poolIdx(size int) int { @@ -119,23 +119,22 @@ type bufferPool struct { } func (p *bufferPool) Get(size int) []byte { - bs := p.Pool.Get().(*[]byte) + bs := p.Pool.Get().([]byte) - if cap(*bs) < size { + if cap(bs) < size { p.Pool.Put(bs) return make([]byte, size) } - return (*bs)[:size] + return (bs)[:size] } func newBytesPool(size int) simpleSharedBufferChildPool { return &bufferPool{ Pool: sync.Pool{ New: func() interface{} { - bs := make([]byte, size) - return &bs + return make([]byte, size) }, }, defaultSize: size, @@ -150,5 +149,5 @@ func (nopBufferPool) Get(length int) []byte { return make([]byte, length) } -func (nopBufferPool) Put(*[]byte) { +func (nopBufferPool) Put([]byte) { } diff --git a/shared_buffer_pool_test.go b/shared_buffer_pool_test.go index f5ed7c8314f1..1ecda754dfdb 100644 --- a/shared_buffer_pool_test.go +++ b/shared_buffer_pool_test.go @@ -42,7 +42,7 @@ func (s) TestSharedBufferPool(t *testing.T) { t.Fatalf("Expected buffer of length %d, got %d", l, len(bs)) } - p.Put(&bs) + p.Put(bs) } } } From 3dcd8331869d75d5a00a63024aca8efd4346cdfc Mon Sep 17 00:00:00 2001 From: Jaewan Park Date: Tue, 27 Jun 2023 23:54:22 +0900 Subject: [PATCH 36/36] Revert "rpc_util: standardize buffer pool interface to use []byte in Put method" This reverts commit 1f4bc3550a6ef2915b5c9da55279d73c209575d7. --- rpc_util.go | 2 +- shared_buffer_pool.go | 17 +++++++++-------- shared_buffer_pool_test.go | 2 +- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/rpc_util.go b/rpc_util.go index 01e9c60b807f..a844d28f49d0 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -803,7 +803,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf if payInfo != nil { payInfo.uncompressedBytes = buf } else { - p.recvBufferPool.Put(buf) + p.recvBufferPool.Put(&buf) } return nil } diff --git a/shared_buffer_pool.go b/shared_buffer_pool.go index 97af1857c296..c3a5a9ac1f19 100644 --- a/shared_buffer_pool.go +++ b/shared_buffer_pool.go @@ -35,7 +35,7 @@ type SharedBufferPool interface { Get(length int) []byte // Put returns a buffer to the pool. - Put([]byte) + Put(*[]byte) } // NewSharedBufferPool creates a simple SharedBufferPool with buckets @@ -68,8 +68,8 @@ func (p *simpleSharedBufferPool) Get(size int) []byte { return p.pools[p.poolIdx(size)].Get(size) } -func (p *simpleSharedBufferPool) Put(bs []byte) { - p.pools[p.poolIdx(cap(bs))].Put(bs) +func (p *simpleSharedBufferPool) Put(bs *[]byte) { + p.pools[p.poolIdx(cap(*bs))].Put(bs) } func (p *simpleSharedBufferPool) poolIdx(size int) int { @@ -119,22 +119,23 @@ type bufferPool struct { } func (p *bufferPool) Get(size int) []byte { - bs := p.Pool.Get().([]byte) + bs := p.Pool.Get().(*[]byte) - if cap(bs) < size { + if cap(*bs) < size { p.Pool.Put(bs) return make([]byte, size) } - return (bs)[:size] + return (*bs)[:size] } func newBytesPool(size int) simpleSharedBufferChildPool { return &bufferPool{ Pool: sync.Pool{ New: func() interface{} { - return make([]byte, size) + bs := make([]byte, size) + return &bs }, }, defaultSize: size, @@ -149,5 +150,5 @@ func (nopBufferPool) Get(length int) []byte { return make([]byte, length) } -func (nopBufferPool) Put([]byte) { +func (nopBufferPool) Put(*[]byte) { } diff --git a/shared_buffer_pool_test.go b/shared_buffer_pool_test.go index 1ecda754dfdb..f5ed7c8314f1 100644 --- a/shared_buffer_pool_test.go +++ b/shared_buffer_pool_test.go @@ -42,7 +42,7 @@ func (s) TestSharedBufferPool(t *testing.T) { t.Fatalf("Expected buffer of length %d, got %d", l, len(bs)) } - p.Put(bs) + p.Put(&bs) } } }