diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 5e2caf5d204d..d9d1e6d6a8cf 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -106,6 +106,10 @@ var ( useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+ "Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.") + clientReadBufferSize = flags.IntSlice("clientReadBufferSize", []int{-1}, "Configures the client read buffer size in bytes. If negative, use the default - may be a a comma-separated list") + clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list") + serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list") + serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list") logger = grpclog.Component("benchmark") ) @@ -306,6 +310,19 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) { }), ) } + if bf.ClientReadBufferSize >= 0 { + opts = append(opts, grpc.WithReadBufferSize(bf.ClientReadBufferSize)) + } + if bf.ClientWriteBufferSize >= 0 { + opts = append(opts, grpc.WithWriteBufferSize(bf.ClientWriteBufferSize)) + } + if bf.ServerReadBufferSize >= 0 { + sopts = append(sopts, grpc.ReadBufferSize(bf.ServerReadBufferSize)) + } + if bf.ServerWriteBufferSize >= 0 { + sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize)) + } + sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1))) opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -496,18 +513,22 @@ type benchOpts struct { // features through command line flags. We generate all possible combinations // for the provided values and run the benchmarks for each combination. type featureOpts struct { - enableTrace []bool - readLatencies []time.Duration - readKbps []int - readMTU []int - maxConcurrentCalls []int - reqSizeBytes []int - respSizeBytes []int - reqPayloadCurves []*stats.PayloadCurve - respPayloadCurves []*stats.PayloadCurve - compModes []string - enableChannelz []bool - enablePreloader []bool + enableTrace []bool + readLatencies []time.Duration + readKbps []int + readMTU []int + maxConcurrentCalls []int + reqSizeBytes []int + respSizeBytes []int + reqPayloadCurves []*stats.PayloadCurve + respPayloadCurves []*stats.PayloadCurve + compModes []string + enableChannelz []bool + enablePreloader []bool + clientReadBufferSize []int + clientWriteBufferSize []int + serverReadBufferSize []int + serverWriteBufferSize []int } // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each @@ -544,6 +565,14 @@ func makeFeaturesNum(b *benchOpts) []int { featuresNum[i] = len(b.features.enableChannelz) case stats.EnablePreloaderIndex: featuresNum[i] = len(b.features.enablePreloader) + case stats.ClientReadBufferSize: + featuresNum[i] = len(b.features.clientReadBufferSize) + case stats.ClientWriteBufferSize: + featuresNum[i] = len(b.features.clientWriteBufferSize) + case stats.ServerReadBufferSize: + featuresNum[i] = len(b.features.serverReadBufferSize) + case stats.ServerWriteBufferSize: + featuresNum[i] = len(b.features.serverWriteBufferSize) default: log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex) } @@ -598,14 +627,18 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { EnableKeepalive: b.enableKeepalive, BenchTime: b.benchTime, // These features can potentially change for each iteration. - EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]], - Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]], - Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]], - MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]], - MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]], - ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]], - EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]], - EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]], + EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]], + Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]], + Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]], + MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]], + MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]], + ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]], + EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]], + EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]], + ClientReadBufferSize: b.features.clientReadBufferSize[curPos[stats.ClientReadBufferSize]], + ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]], + ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]], + ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]], } if len(b.features.reqPayloadCurves) == 0 { f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]] @@ -662,16 +695,20 @@ func processFlags() *benchOpts { useBufconn: *useBufconn, enableKeepalive: *enableKeepalive, features: &featureOpts{ - enableTrace: setToggleMode(*traceMode), - readLatencies: append([]time.Duration(nil), *readLatency...), - readKbps: append([]int(nil), *readKbps...), - readMTU: append([]int(nil), *readMTU...), - maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...), - reqSizeBytes: append([]int(nil), *readReqSizeBytes...), - respSizeBytes: append([]int(nil), *readRespSizeBytes...), - compModes: setCompressorMode(*compressorMode), - enableChannelz: setToggleMode(*channelzOn), - enablePreloader: setToggleMode(*preloaderMode), + enableTrace: setToggleMode(*traceMode), + readLatencies: append([]time.Duration(nil), *readLatency...), + readKbps: append([]int(nil), *readKbps...), + readMTU: append([]int(nil), *readMTU...), + maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...), + reqSizeBytes: append([]int(nil), *readReqSizeBytes...), + respSizeBytes: append([]int(nil), *readRespSizeBytes...), + compModes: setCompressorMode(*compressorMode), + enableChannelz: setToggleMode(*channelzOn), + enablePreloader: setToggleMode(*preloaderMode), + clientReadBufferSize: append([]int(nil), *clientReadBufferSize...), + clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...), + serverReadBufferSize: append([]int(nil), *serverReadBufferSize...), + serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...), }, } diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index 6275c3c3a71c..7f4db236277e 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -52,6 +52,10 @@ const ( CompModesIndex EnableChannelzIndex EnablePreloaderIndex + ClientReadBufferSize + ClientWriteBufferSize + ServerReadBufferSize + ServerWriteBufferSize // MaxFeatureIndex is a place holder to indicate the total number of feature // indices we have. Any new feature indices should be added above this. @@ -109,6 +113,14 @@ type Features struct { EnableChannelz bool // EnablePreloader indicates if preloading was turned on. EnablePreloader bool + // ClientReadBufferSize is the size of the client read buffer in bytes. If negative, use the default buffer size. + ClientReadBufferSize int + // ClientWriteBufferSize is the size of the client write buffer in bytes. If negative, use the default buffer size. + ClientWriteBufferSize int + // ServerReadBufferSize is the size of the server read buffer in bytes. If negative, use the default buffer size. + ServerReadBufferSize int + // ServerWriteBufferSize is the size of the server write buffer in bytes. If negative, use the default buffer size. + ServerWriteBufferSize int } // String returns all the feature values as a string. @@ -126,10 +138,13 @@ func (f Features) String() string { } return fmt.Sprintf("networkMode_%v-bufConn_%v-keepalive_%v-benchTime_%v-"+ "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+ - "compressor_%v-channelz_%v-preloader_%v", + "compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+ + "clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-", f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString, - respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader) + respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader, + f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize, + f.ServerWriteBufferSize) } // SharedFeatures returns the shared features as a pretty printable string. @@ -193,6 +208,14 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim b.WriteString(fmt.Sprintf("Channelz%v%v%v", sep, f.EnableChannelz, delim)) case EnablePreloaderIndex: b.WriteString(fmt.Sprintf("Preloader%v%v%v", sep, f.EnablePreloader, delim)) + case ClientReadBufferSize: + b.WriteString(fmt.Sprintf("ClientReadBufferSize%v%v%v", sep, f.ClientWriteBufferSize, delim)) + case ClientWriteBufferSize: + b.WriteString(fmt.Sprintf("ClientWriteBufferSize%v%v%v", sep, f.ClientWriteBufferSize, delim)) + case ServerReadBufferSize: + b.WriteString(fmt.Sprintf("ServerReadBufferSize%v%v%v", sep, f.ServerReadBufferSize, delim)) + case ServerWriteBufferSize: + b.WriteString(fmt.Sprintf("ServerWriteBufferSize%v%v%v", sep, f.ServerWriteBufferSize, delim)) default: log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex) }