Skip to content

Commit

Permalink
rpc_util: exclude shared prefix from shared recv buffer pool
Browse files Browse the repository at this point in the history
  • Loading branch information
hueypark committed Apr 19, 2023
1 parent ca9f6de commit 9be7889
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 47 deletions.
40 changes: 20 additions & 20 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ var (
clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
sharedRecvBufferPool = flags.StringWithAllowedValues("sharedRecvBufferPool", sharedRecvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allSharedRecvBufferPools)
recvBufferPool = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools)

logger = grpclog.Component("benchmark")
)
Expand All @@ -135,9 +135,9 @@ const (
networkModeWAN = "WAN"
networkLongHaul = "Longhaul"
// Shared recv buffer pool
sharedRecvBufferPoolNil = "nil"
sharedRecvBufferPoolSimple = "simple"
sharedRecvBufferPoolAll = "all"
recvBufferPoolNil = "nil"
recvBufferPoolSimple = "simple"
recvBufferPoolAll = "all"

numStatsBuckets = 10
warmupCallCount = 10
Expand All @@ -149,7 +149,7 @@ var (
allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
allSharedRecvBufferPools = []string{sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple, sharedRecvBufferPoolAll}
allRecvBufferPools = []string{recvBufferPoolNil, recvBufferPoolSimple, recvBufferPoolAll}
defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
defaultReadKbps = []int{0, 10240} // if non-positive, infinite
defaultReadMTU = []int{0} // if non-positive, infinite
Expand Down Expand Up @@ -327,14 +327,14 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
if bf.ServerWriteBufferSize >= 0 {
sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize))
}
switch bf.SharedRecvBufferPool {
case sharedRecvBufferPoolNil:
switch bf.RecvBufferPool {
case recvBufferPoolNil:
// Do nothing.
case sharedRecvBufferPoolSimple:
opts = append(opts, grpc.WithSharedRecvBufferPool(grpc.NewSimpleSharedBufferPool()))
sopts = append(sopts, grpc.SharedRecvBufferPool(grpc.NewSimpleSharedBufferPool()))
case recvBufferPoolSimple:
opts = append(opts, grpc.WithRecvBufferPool(grpc.NewSimpleSharedBufferPool()))
sopts = append(sopts, grpc.RecvBufferPool(grpc.NewSimpleSharedBufferPool()))
default:
logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.SharedRecvBufferPool)
logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool)
}

sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
Expand Down Expand Up @@ -543,7 +543,7 @@ type featureOpts struct {
clientWriteBufferSize []int
serverReadBufferSize []int
serverWriteBufferSize []int
sharedRecvBufferPools []string
recvBufferPools []string
}

// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
Expand Down Expand Up @@ -588,8 +588,8 @@ func makeFeaturesNum(b *benchOpts) []int {
featuresNum[i] = len(b.features.serverReadBufferSize)
case stats.ServerWriteBufferSize:
featuresNum[i] = len(b.features.serverWriteBufferSize)
case stats.SharedRecvBufferPool:
featuresNum[i] = len(b.features.sharedRecvBufferPools)
case stats.RecvBufferPool:
featuresNum[i] = len(b.features.recvBufferPools)
default:
log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
}
Expand Down Expand Up @@ -656,7 +656,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]],
ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]],
ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]],
SharedRecvBufferPool: b.features.sharedRecvBufferPools[curPos[stats.SharedRecvBufferPool]],
RecvBufferPool: b.features.recvBufferPools[curPos[stats.RecvBufferPool]],
}
if len(b.features.reqPayloadCurves) == 0 {
f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
Expand Down Expand Up @@ -727,7 +727,7 @@ func processFlags() *benchOpts {
clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...),
serverReadBufferSize: append([]int(nil), *serverReadBufferSize...),
serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...),
sharedRecvBufferPools: setSharedRecvBufferPool(*sharedRecvBufferPool),
recvBufferPools: setRecvBufferPool(*recvBufferPool),
},
}

Expand Down Expand Up @@ -803,12 +803,12 @@ func setCompressorMode(val string) []string {
}
}

func setSharedRecvBufferPool(val string) []string {
func setRecvBufferPool(val string) []string {
switch val {
case sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple:
case recvBufferPoolNil, recvBufferPoolSimple:
return []string{val}
case sharedRecvBufferPoolAll:
return []string{sharedRecvBufferPoolNil, sharedRecvBufferPoolSimple}
case recvBufferPoolAll:
return []string{recvBufferPoolNil, recvBufferPoolSimple}
default:
// This should never happen because a wrong value passed to this flag would
// be caught during flag.Parse().
Expand Down
14 changes: 7 additions & 7 deletions benchmark/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
ClientWriteBufferSize
ServerReadBufferSize
ServerWriteBufferSize
SharedRecvBufferPool
RecvBufferPool

// MaxFeatureIndex is a place holder to indicate the total number of feature
// indices we have. Any new feature indices should be added above this.
Expand Down Expand Up @@ -122,8 +122,8 @@ type Features struct {
ServerReadBufferSize int
// ServerWriteBufferSize is the size of the server write buffer in bytes. If negative, use the default buffer size.
ServerWriteBufferSize int
// SharedRecvBufferPool represents the shared recv buffer pool used.
SharedRecvBufferPool string
// RecvBufferPool represents the shared recv buffer pool used.
RecvBufferPool string
}

// String returns all the feature values as a string.
Expand All @@ -143,12 +143,12 @@ func (f Features) String() string {
"trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+
"compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+
"clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+
"sharedRecvBufferPool_%v-",
"recvBufferPool_%v-",
f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace,
f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString,
respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader,
f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize,
f.ServerWriteBufferSize, f.SharedRecvBufferPool)
f.ServerWriteBufferSize, f.RecvBufferPool)
}

// SharedFeatures returns the shared features as a pretty printable string.
Expand Down Expand Up @@ -220,8 +220,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim
b.WriteString(fmt.Sprintf("ServerReadBufferSize%v%v%v", sep, f.ServerReadBufferSize, delim))
case ServerWriteBufferSize:
b.WriteString(fmt.Sprintf("ServerWriteBufferSize%v%v%v", sep, f.ServerWriteBufferSize, delim))
case SharedRecvBufferPool:
b.WriteString(fmt.Sprintf("SharedRecvBufferPool%v%v%v", sep, f.SharedRecvBufferPool, delim))
case RecvBufferPool:
b.WriteString(fmt.Sprintf("RecvBufferPool%v%v%v", sep, f.RecvBufferPool, delim))
default:
log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex)
}
Expand Down
10 changes: 5 additions & 5 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type dialOptions struct {
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
resolvers []resolver.Builder
sharedRecvBufferPool SharedBufferPool
recvBufferPool SharedBufferPool
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -628,7 +628,7 @@ func defaultDialOptions() dialOptions {
ReadBufferSize: defaultReadBufSize,
UseProxy: true,
},
sharedRecvBufferPool: nopBufferPool{},
recvBufferPool: nopBufferPool{},
}
}

Expand Down Expand Up @@ -658,7 +658,7 @@ func WithResolvers(rs ...resolver.Builder) DialOption {
})
}

// WithSharedRecvBufferPool returns a DialOption that configures the ClientConn
// WithRecvBufferPool returns a DialOption that configures the ClientConn
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
//
Expand All @@ -673,8 +673,8 @@ func WithResolvers(rs ...resolver.Builder) DialOption {
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithSharedRecvBufferPool(bufferPool SharedBufferPool) DialOption {
func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.sharedRecvBufferPool = bufferPool
o.recvBufferPool = bufferPool
})
}
8 changes: 4 additions & 4 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,8 @@ type parser struct {
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte

// sharedRecvBufferPool is the pool of shared receive buffers.
sharedRecvBufferPool SharedBufferPool
// recvBufferPool is the pool of shared receive buffers.
recvBufferPool SharedBufferPool
}

// recvMsg reads a complete gRPC message from the stream.
Expand Down Expand Up @@ -613,7 +613,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
msg = p.sharedRecvBufferPool.Get(int(length))
msg = p.recvBufferPool.Get(int(length))
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
Expand Down Expand Up @@ -803,7 +803,7 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
if payInfo != nil {
payInfo.uncompressedBytes = buf
} else {
p.sharedRecvBufferPool.Put(&buf)
p.recvBufferPool.Put(&buf)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s) TestSimpleParsing(t *testing.T) {
{append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
} {
buf := fullReader{bytes.NewReader(test.p)}
parser := &parser{r: buf, sharedRecvBufferPool: nopBufferPool{}}
parser := &parser{r: buf, recvBufferPool: nopBufferPool{}}
pt, b, err := parser.recvMsg(math.MaxInt32)
if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
Expand All @@ -77,7 +77,7 @@ func (s) TestMultipleParsing(t *testing.T) {
// Set a byte stream consists of 3 messages with their headers.
p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
b := fullReader{bytes.NewReader(p)}
parser := &parser{r: b, sharedRecvBufferPool: nopBufferPool{}}
parser := &parser{r: b, recvBufferPool: nopBufferPool{}}

wantRecvs := []struct {
pt payloadFormat
Expand Down
14 changes: 7 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type serverOptions struct {
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
sharedRecvBufferPool SharedBufferPool
recvBufferPool SharedBufferPool
}

var defaultServerOptions = serverOptions{
Expand All @@ -183,7 +183,7 @@ var defaultServerOptions = serverOptions{
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
sharedRecvBufferPool: nopBufferPool{},
recvBufferPool: nopBufferPool{},
}
var globalServerOptions []ServerOption

Expand Down Expand Up @@ -554,7 +554,7 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
})
}

// SharedRecvBufferPool returns a ServerOption that configures the server
// RecvBufferPool returns a ServerOption that configures the server
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
//
Expand All @@ -569,9 +569,9 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SharedRecvBufferPool(bufferPool SharedBufferPool) ServerOption {
func RecvBufferPool(bufferPool SharedBufferPool) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.sharedRecvBufferPool = bufferPool
o.recvBufferPool = bufferPool
})
}

Expand Down Expand Up @@ -1319,7 +1319,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{}
}
d, err := recvAndDecompress(&parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
Expand Down Expand Up @@ -1529,7 +1529,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ctx: ctx,
t: t,
s: stream,
p: &parser{r: stream, sharedRecvBufferPool: s.opts.sharedRecvBufferPool},
p: &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
Expand Down
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (a *csAttempt) newStream() error {
return toRPCErr(nse.Err)
}
a.s = s
a.p = &parser{r: s, sharedRecvBufferPool: a.cs.cc.dopts.sharedRecvBufferPool}
a.p = &parser{r: s, recvBufferPool: a.cs.cc.dopts.recvBufferPool}
return nil
}

Expand Down Expand Up @@ -1262,7 +1262,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
return nil, err
}
as.s = s
as.p = &parser{r: s, sharedRecvBufferPool: ac.dopts.sharedRecvBufferPool}
as.p = &parser{r: s, recvBufferPool: ac.dopts.recvBufferPool}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
// Listen on cc and stream contexts to cleanup when the user closes the
Expand Down

0 comments on commit 9be7889

Please sign in to comment.