diff --git a/dialoptions.go b/dialoptions.go index 4866da101c60..025cca665243 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -76,6 +76,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder + useBytesPoolForParser bool } // DialOption configures how we set up the connection. @@ -635,3 +636,17 @@ func WithResolvers(rs ...resolver.Builder) DialOption { o.resolvers = append(o.resolvers, rs...) }) } + +// WithUseBytesPoolForParser returns a DialOption that specifies whether to use +// a bytes pool for parsing. Setting this to true will reduce the memory allocation +// in the parser. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WithUseBytesPoolForParser(useBytesPoolForParser bool) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.useBytesPoolForParser = useBytesPoolForParser + }) +} diff --git a/rpc_util.go b/rpc_util.go index 5d0a41136389..2acd01d90e64 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -541,6 +541,9 @@ type parser struct { // The header of a gRPC message. Find more detail at // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte + + // useBytesPool indicates whether to use bytes pool to allocate + useBytesPool bool } // recvMsg reads a complete gRPC message from the stream. @@ -574,7 +577,11 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - msg = pool.Get(int(length)) + if p.useBytesPool { + msg = pool.Get(int(length)) + } else { + msg = make([]byte, int(length)) + } if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF @@ -764,7 +771,9 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf payInfo.uncompressedBytes = make([]byte, len(buf)) copy(payInfo.uncompressedBytes, buf) } - pool.Put(&buf) + if p.useBytesPool { + pool.Put(&buf) + } return nil } diff --git a/server.go b/server.go index d5a6e78be44d..a5bd2dbfac7a 100644 --- a/server.go +++ b/server.go @@ -174,6 +174,7 @@ type serverOptions struct { maxHeaderListSize *uint32 headerTableSize *uint32 numServerWorkers uint32 + useBytesPoolForParser bool } var defaultServerOptions = serverOptions{ @@ -552,6 +553,19 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { }) } +// UseBytesPoolForParser returns a ServerOption that sets whether to use a bytes pool +// for the parser. Setting this to true will reduce the memory allocation in the parser. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func UseBytesPoolForParser(useBytesPoolForParser bool) ServerOption { + return newFuncServerOption(func(o *serverOptions) { + o.useBytesPoolForParser = useBytesPoolForParser + }) +} + // serverWorkerResetThreshold defines how often the stack must be reset. Every // N requests, by spawning a new goroutine in its place, a worker can reset its // stack so that large stacks don't live in memory forever. 2^16 should allow diff --git a/stream.go b/stream.go index 93231af2ac56..57c709e89aec 100644 --- a/stream.go +++ b/stream.go @@ -490,7 +490,7 @@ func (a *csAttempt) newStream() error { return toRPCErr(nse.Err) } a.s = s - a.p = &parser{r: s} + a.p = &parser{r: s, useBytesPool: a.cs.cc.dopts.useBytesPoolForParser} return nil } @@ -1249,7 +1249,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin return nil, err } as.s = s - as.p = &parser{r: s} + as.p = &parser{r: s, useBytesPool: ac.dopts.useBytesPoolForParser} ac.incrCallsStarted() if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the