Skip to content

Commit db32c5b

Browse files
Fix preloader mode in benchmarks (#6359)
1 parent f0280f9 commit db32c5b

File tree

2 files changed

+106
-40
lines changed

2 files changed

+106
-40
lines changed

benchmark/benchmain/main.go

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
"reflect"
5454
"runtime"
5555
"runtime/pprof"
56+
"strconv"
5657
"strings"
5758
"sync"
5859
"sync/atomic"
@@ -81,7 +82,8 @@ var (
8182
traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
8283
fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
8384
preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
84-
fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
85+
fmt.Sprintf("Preloader mode - One of: %v, preloader works only in streaming and unconstrained modes and will be ignored in unary mode",
86+
strings.Join(allToggleModes, ", ")), allToggleModes)
8587
channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
8688
fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
8789
compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
@@ -401,20 +403,11 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
401403
}
402404

403405
func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
404-
clients, cleanup := makeClients(bf)
406+
streams, req, cleanup := setupStream(bf, false)
405407

406-
streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections)
407-
for cn := 0; cn < bf.Connections; cn++ {
408-
tc := clients[cn]
409-
streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
410-
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
411-
412-
stream, err := tc.StreamingCall(context.Background())
413-
if err != nil {
414-
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
415-
}
416-
streams[cn][pos] = stream
417-
}
408+
var preparedMsg [][]*grpc.PreparedMsg
409+
if bf.EnablePreloader {
410+
preparedMsg = prepareMessages(streams, req)
418411
}
419412

420413
return func(cn, pos int) {
@@ -426,24 +419,25 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
426419
if bf.RespPayloadCurve != nil {
427420
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
428421
}
429-
streamCaller(streams[cn][pos], reqSizeBytes, respSizeBytes)
422+
var req interface{}
423+
if bf.EnablePreloader {
424+
req = preparedMsg[cn][pos]
425+
} else {
426+
pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, reqSizeBytes)
427+
req = &testpb.SimpleRequest{
428+
ResponseType: pl.Type,
429+
ResponseSize: int32(respSizeBytes),
430+
Payload: pl,
431+
}
432+
}
433+
streamCaller(streams[cn][pos], req)
430434
}, cleanup
431435
}
432436

433437
func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
434-
streams, req, cleanup := setupUnconstrainedStream(bf)
438+
streams, req, cleanup := setupStream(bf, true)
435439

436-
preparedMsg := make([][]*grpc.PreparedMsg, len(streams))
437-
for cn, connStreams := range streams {
438-
preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams))
439-
for pos, stream := range connStreams {
440-
preparedMsg[cn][pos] = &grpc.PreparedMsg{}
441-
err := preparedMsg[cn][pos].Encode(stream, req)
442-
if err != nil {
443-
logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err)
444-
}
445-
}
446-
}
440+
preparedMsg := prepareMessages(streams, req)
447441

448442
return func(cn, pos int) {
449443
streams[cn][pos].SendMsg(preparedMsg[cn][pos])
@@ -453,7 +447,7 @@ func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRe
453447
}
454448

455449
func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
456-
streams, req, cleanup := setupUnconstrainedStream(bf)
450+
streams, req, cleanup := setupStream(bf, true)
457451

458452
return func(cn, pos int) {
459453
streams[cn][pos].Send(req)
@@ -462,13 +456,19 @@ func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, r
462456
}, cleanup
463457
}
464458

465-
func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
459+
func setupStream(bf stats.Features, unconstrained bool) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
466460
clients, cleanup := makeClients(bf)
467461

468462
streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections)
469-
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1",
470-
benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
471-
ctx := metadata.NewOutgoingContext(context.Background(), md)
463+
ctx := context.Background()
464+
if unconstrained {
465+
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
466+
ctx = metadata.NewOutgoingContext(ctx, md)
467+
}
468+
if bf.EnablePreloader {
469+
md := metadata.Pairs(benchmark.PreloadMsgSizeHeader, strconv.Itoa(bf.RespSizeBytes), benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
470+
ctx = metadata.NewOutgoingContext(ctx, md)
471+
}
472472
for cn := 0; cn < bf.Connections; cn++ {
473473
tc := clients[cn]
474474
streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
@@ -491,6 +491,20 @@ func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_
491491
return streams, req, cleanup
492492
}
493493

494+
func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient, req *testpb.SimpleRequest) [][]*grpc.PreparedMsg {
495+
preparedMsg := make([][]*grpc.PreparedMsg, len(streams))
496+
for cn, connStreams := range streams {
497+
preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams))
498+
for pos, stream := range connStreams {
499+
preparedMsg[cn][pos] = &grpc.PreparedMsg{}
500+
if err := preparedMsg[cn][pos].Encode(stream, req); err != nil {
501+
logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err)
502+
}
503+
}
504+
}
505+
return preparedMsg
506+
}
507+
494508
// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
495509
// request and response sizes.
496510
func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) {
@@ -499,8 +513,8 @@ func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int)
499513
}
500514
}
501515

502-
func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
503-
if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
516+
func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, req interface{}) {
517+
if err := bm.DoStreamingRoundTripPreloaded(stream, req); err != nil {
504518
logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
505519
}
506520
}
@@ -790,6 +804,9 @@ func processFlags() *benchOpts {
790804
if len(opts.features.reqSizeBytes) != 0 {
791805
log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
792806
}
807+
if len(opts.features.enablePreloader) != 0 {
808+
log.Fatalf("you may not specify -reqPayloadCurveFiles and -preloader at the same time")
809+
}
793810
for _, file := range *reqPayloadCurveFiles {
794811
pc, err := stats.NewPayloadCurve(file)
795812
if err != nil {
@@ -807,6 +824,9 @@ func processFlags() *benchOpts {
807824
if len(opts.features.respSizeBytes) != 0 {
808825
log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
809826
}
827+
if len(opts.features.enablePreloader) != 0 {
828+
log.Fatalf("you may not specify -respPayloadCurveFiles and -preloader at the same time")
829+
}
810830
for _, file := range *respPayloadCurveFiles {
811831
pc, err := stats.NewPayloadCurve(file)
812832
if err != nil {

benchmark/benchmark.go

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"log"
2929
"math/rand"
3030
"net"
31+
"strconv"
3132
"time"
3233

3334
"google.golang.org/grpc"
@@ -83,13 +84,35 @@ const UnconstrainedStreamingHeader = "unconstrained-streaming"
8384
// the server should sleep between consecutive RPC responses.
8485
const UnconstrainedStreamingDelayHeader = "unconstrained-streaming-delay"
8586

87+
// PreloadMsgSizeHeader indicates that the client is going to ask for
88+
// a fixed response size and passes this size to the server.
89+
// The server is expected to preload the response on startup.
90+
const PreloadMsgSizeHeader = "preload-msg-size"
91+
8692
func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
93+
preloadMsgSize := 0
94+
if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[PreloadMsgSizeHeader]) != 0 {
95+
val := md[PreloadMsgSizeHeader][0]
96+
var err error
97+
preloadMsgSize, err = strconv.Atoi(val)
98+
if err != nil {
99+
return fmt.Errorf("%q header value is not an integer: %s", PreloadMsgSizeHeader, err)
100+
}
101+
}
102+
87103
if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingHeader]) != 0 {
88-
return s.UnconstrainedStreamingCall(stream)
104+
return s.UnconstrainedStreamingCall(stream, preloadMsgSize)
89105
}
90106
response := &testpb.SimpleResponse{
91107
Payload: new(testpb.Payload),
92108
}
109+
preloadedResponse := &grpc.PreparedMsg{}
110+
if preloadMsgSize > 0 {
111+
setPayload(response.Payload, testpb.PayloadType_COMPRESSABLE, preloadMsgSize)
112+
if err := preloadedResponse.Encode(stream, response); err != nil {
113+
return err
114+
}
115+
}
93116
in := new(testpb.SimpleRequest)
94117
for {
95118
// use ServerStream directly to reuse the same testpb.SimpleRequest object
@@ -101,14 +124,19 @@ func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCal
101124
if err != nil {
102125
return err
103126
}
104-
setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
105-
if err := stream.Send(response); err != nil {
127+
if preloadMsgSize > 0 {
128+
err = stream.SendMsg(preloadedResponse)
129+
} else {
130+
setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
131+
err = stream.Send(response)
132+
}
133+
if err != nil {
106134
return err
107135
}
108136
}
109137
}
110138

111-
func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
139+
func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer, preloadMsgSize int) error {
112140
maxSleep := 0
113141
if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingDelayHeader]) != 0 {
114142
val := md[UnconstrainedStreamingDelayHeader][0]
@@ -135,6 +163,13 @@ func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService
135163
}
136164
setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
137165

166+
preloadedResponse := &grpc.PreparedMsg{}
167+
if preloadMsgSize > 0 {
168+
if err := preloadedResponse.Encode(stream, response); err != nil {
169+
return err
170+
}
171+
}
172+
138173
go func() {
139174
for {
140175
// Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest.
@@ -154,7 +189,12 @@ func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService
154189
if maxSleep > 0 {
155190
time.Sleep(time.Duration(rand.Intn(maxSleep)))
156191
}
157-
err := stream.Send(response)
192+
var err error
193+
if preloadMsgSize > 0 {
194+
err = stream.SendMsg(preloadedResponse)
195+
} else {
196+
err = stream.Send(response)
197+
}
158198
switch status.Code(err) {
159199
case codes.Unavailable, codes.Canceled:
160200
return
@@ -258,7 +298,13 @@ func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient,
258298
ResponseSize: int32(respSize),
259299
Payload: pl,
260300
}
261-
if err := stream.Send(req); err != nil {
301+
return DoStreamingRoundTripPreloaded(stream, req)
302+
}
303+
304+
// DoStreamingRoundTripPreloaded performs a round trip for a single streaming rpc with preloaded payload.
305+
func DoStreamingRoundTripPreloaded(stream testgrpc.BenchmarkService_StreamingCallClient, req interface{}) error {
306+
// req could be either *testpb.SimpleRequest or *grpc.PreparedMsg
307+
if err := stream.SendMsg(req); err != nil {
262308
return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
263309
}
264310
if _, err := stream.Recv(); err != nil {

0 commit comments

Comments
 (0)