Skip to content

Commit

Permalink
grpc: Add useBytesPoolForParser option
Browse files Browse the repository at this point in the history
  • Loading branch information
hueypark committed Jan 31, 2023
1 parent cb8827d commit bae37e3
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 4 deletions.
15 changes: 15 additions & 0 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type dialOptions struct {
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
resolvers []resolver.Builder
useBytesPoolForParser bool
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -635,3 +636,17 @@ func WithResolvers(rs ...resolver.Builder) DialOption {
o.resolvers = append(o.resolvers, rs...)
})
}

// WithUseBytesPoolForParser returns a DialOption that specifies whether to use
// a bytes pool for parsing. Setting this to true will reduce the memory allocation
// in the parser.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithUseBytesPoolForParser(useBytesPoolForParser bool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.useBytesPoolForParser = useBytesPoolForParser
})
}
13 changes: 11 additions & 2 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,9 @@ type parser struct {
// The header of a gRPC message. Find more detail at
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte

// useBytesPool indicates whether to use bytes pool to allocate
useBytesPool bool
}

// recvMsg reads a complete gRPC message from the stream.
Expand Down Expand Up @@ -574,7 +577,11 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
msg = pool.Get(int(length))
if p.useBytesPool {
msg = pool.Get(int(length))
} else {
msg = make([]byte, int(length))
}
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
Expand Down Expand Up @@ -764,7 +771,9 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
payInfo.uncompressedBytes = make([]byte, len(buf))
copy(payInfo.uncompressedBytes, buf)
}
pool.Put(&buf)
if p.useBytesPool {
pool.Put(&buf)
}
return nil
}

Expand Down
14 changes: 14 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type serverOptions struct {
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
useBytesPoolForParser bool
}

var defaultServerOptions = serverOptions{
Expand Down Expand Up @@ -552,6 +553,19 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
})
}

// UseBytesPoolForParser returns a ServerOption that sets whether to use a bytes pool
// for the parser. Setting this to true will reduce the memory allocation in the parser.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func UseBytesPoolForParser(useBytesPoolForParser bool) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.useBytesPoolForParser = useBytesPoolForParser
})
}

// serverWorkerResetThreshold defines how often the stack must be reset. Every
// N requests, by spawning a new goroutine in its place, a worker can reset its
// stack so that large stacks don't live in memory forever. 2^16 should allow
Expand Down
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (a *csAttempt) newStream() error {
return toRPCErr(nse.Err)
}
a.s = s
a.p = &parser{r: s}
a.p = &parser{r: s, useBytesPool: a.cs.cc.dopts.useBytesPoolForParser}
return nil
}

Expand Down Expand Up @@ -1249,7 +1249,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
return nil, err
}
as.s = s
as.p = &parser{r: s}
as.p = &parser{r: s, useBytesPool: ac.dopts.useBytesPoolForParser}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
// Listen on cc and stream contexts to cleanup when the user closes the
Expand Down

0 comments on commit bae37e3

Please sign in to comment.