Skip to content

Commit f4e4574

Browse files
committed
add test
1 parent e28bf88 commit f4e4574

File tree

5 files changed

+177
-46
lines changed

5 files changed

+177
-46
lines changed

benchmark/primitives/primitives_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,3 +425,42 @@ func BenchmarkRLockUnlock(b *testing.B) {
425425
}
426426
})
427427
}
428+
429+
type ifNop interface {
430+
nop()
431+
}
432+
433+
type alwaysNop struct{}
434+
435+
func (alwaysNop) nop() {}
436+
437+
type concreteNop struct {
438+
isNop atomic.Bool
439+
i int
440+
}
441+
442+
func (c *concreteNop) nop() {
443+
if c.isNop.Load() {
444+
return
445+
}
446+
c.i++
447+
}
448+
449+
func BenchmarkInterfaceNop(b *testing.B) {
450+
n := ifNop(alwaysNop{})
451+
b.RunParallel(func(pb *testing.PB) {
452+
for pb.Next() {
453+
n.nop()
454+
}
455+
})
456+
}
457+
458+
func BenchmarkConcreteNop(b *testing.B) {
459+
n := &concreteNop{}
460+
n.isNop.Store(true)
461+
b.RunParallel(func(pb *testing.PB) {
462+
for pb.Next() {
463+
n.nop()
464+
}
465+
})
466+
}

internal/transport/http2_server.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,15 +171,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
171171
ID: http2.SettingMaxFrameSize,
172172
Val: http2MaxFrameLen,
173173
}}
174-
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
175-
// permitted in the HTTP2 spec.
176-
maxStreams := config.MaxStreams
177-
if maxStreams == 0 {
178-
maxStreams = math.MaxUint32
179-
} else {
174+
if config.MaxStreams != math.MaxUint32 {
180175
isettings = append(isettings, http2.Setting{
181176
ID: http2.SettingMaxConcurrentStreams,
182-
Val: maxStreams,
177+
Val: config.MaxStreams,
183178
})
184179
}
185180
dynamicWindow := true
@@ -258,7 +253,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
258253
framer: framer,
259254
readerDone: make(chan struct{}),
260255
writerDone: make(chan struct{}),
261-
maxStreams: maxStreams,
256+
maxStreams: config.MaxStreams,
262257
inTapHandle: config.InTapHandle,
263258
fc: &trInFlow{limit: uint32(icwz)},
264259
state: reachable,

internal/transport/transport_test.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,9 @@ func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hT
337337
return
338338
}
339339
rawConn := conn
340+
if serverConfig.MaxStreams == 0 {
341+
serverConfig.MaxStreams = math.MaxUint32
342+
}
340343
transport, err := NewServerTransport(conn, serverConfig)
341344
if err != nil {
342345
return
@@ -425,8 +428,8 @@ func setUpServerOnly(t *testing.T, port int, sc *ServerConfig, ht hType) *server
425428
return server
426429
}
427430

428-
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, *http2Client, func()) {
429-
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
431+
func setUp(t *testing.T, port int, ht hType) (*server, *http2Client, func()) {
432+
return setUpWithOptions(t, port, &ServerConfig{}, ht, ConnectOptions{})
430433
}
431434

432435
func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
@@ -521,7 +524,7 @@ func (s) TestInflightStreamClosing(t *testing.T) {
521524

522525
// Tests that when streamID > MaxStreamId, the current client transport drains.
523526
func (s) TestClientTransportDrainsAfterStreamIDExhausted(t *testing.T) {
524-
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
527+
server, ct, cancel := setUp(t, 0, normal)
525528
defer cancel()
526529
defer server.stop()
527530
callHdr := &CallHdr{
@@ -566,7 +569,7 @@ func (s) TestClientTransportDrainsAfterStreamIDExhausted(t *testing.T) {
566569
}
567570

568571
func (s) TestClientSendAndReceive(t *testing.T) {
569-
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
572+
server, ct, cancel := setUp(t, 0, normal)
570573
defer cancel()
571574
callHdr := &CallHdr{
572575
Host: "localhost",
@@ -606,7 +609,7 @@ func (s) TestClientSendAndReceive(t *testing.T) {
606609
}
607610

608611
func (s) TestClientErrorNotify(t *testing.T) {
609-
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
612+
server, ct, cancel := setUp(t, 0, normal)
610613
defer cancel()
611614
go server.stop()
612615
// ct.reader should detect the error and activate ct.Error().
@@ -640,7 +643,7 @@ func performOneRPC(ct ClientTransport) {
640643
}
641644

642645
func (s) TestClientMix(t *testing.T) {
643-
s, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
646+
s, ct, cancel := setUp(t, 0, normal)
644647
defer cancel()
645648
time.AfterFunc(time.Second, s.stop)
646649
go func(ct ClientTransport) {
@@ -654,7 +657,7 @@ func (s) TestClientMix(t *testing.T) {
654657
}
655658

656659
func (s) TestLargeMessage(t *testing.T) {
657-
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
660+
server, ct, cancel := setUp(t, 0, normal)
658661
defer cancel()
659662
callHdr := &CallHdr{
660663
Host: "localhost",
@@ -789,7 +792,7 @@ func (s) TestLargeMessageWithDelayRead(t *testing.T) {
789792
// proceed until they complete naturally, while not allowing creation of new
790793
// streams during this window.
791794
func (s) TestGracefulClose(t *testing.T) {
792-
server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong)
795+
server, ct, cancel := setUp(t, 0, pingpong)
793796
defer cancel()
794797
defer func() {
795798
// Stop the server's listener to make the server's goroutines terminate
@@ -855,7 +858,7 @@ func (s) TestGracefulClose(t *testing.T) {
855858
}
856859

857860
func (s) TestLargeMessageSuspension(t *testing.T) {
858-
server, ct, cancel := setUp(t, 0, math.MaxUint32, suspended)
861+
server, ct, cancel := setUp(t, 0, suspended)
859862
defer cancel()
860863
callHdr := &CallHdr{
861864
Host: "localhost",
@@ -963,7 +966,7 @@ func (s) TestMaxStreams(t *testing.T) {
963966
}
964967

965968
func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) {
966-
server, ct, cancel := setUp(t, 0, math.MaxUint32, suspended)
969+
server, ct, cancel := setUp(t, 0, suspended)
967970
defer cancel()
968971
callHdr := &CallHdr{
969972
Host: "localhost",
@@ -1435,7 +1438,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
14351438
var encodingTestStatus = status.New(codes.Internal, "\n")
14361439

14371440
func (s) TestEncodingRequiredStatus(t *testing.T) {
1438-
server, ct, cancel := setUp(t, 0, math.MaxUint32, encodingRequiredStatus)
1441+
server, ct, cancel := setUp(t, 0, encodingRequiredStatus)
14391442
defer cancel()
14401443
callHdr := &CallHdr{
14411444
Host: "localhost",
@@ -1463,7 +1466,7 @@ func (s) TestEncodingRequiredStatus(t *testing.T) {
14631466
}
14641467

14651468
func (s) TestInvalidHeaderField(t *testing.T) {
1466-
server, ct, cancel := setUp(t, 0, math.MaxUint32, invalidHeaderField)
1469+
server, ct, cancel := setUp(t, 0, invalidHeaderField)
14671470
defer cancel()
14681471
callHdr := &CallHdr{
14691472
Host: "localhost",
@@ -1485,7 +1488,7 @@ func (s) TestInvalidHeaderField(t *testing.T) {
14851488
}
14861489

14871490
func (s) TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) {
1488-
server, ct, cancel := setUp(t, 0, math.MaxUint32, invalidHeaderField)
1491+
server, ct, cancel := setUp(t, 0, invalidHeaderField)
14891492
defer cancel()
14901493
defer server.stop()
14911494
defer ct.Close(fmt.Errorf("closed manually by test"))
@@ -2153,7 +2156,7 @@ func (s) TestPingPong1MB(t *testing.T) {
21532156

21542157
// This is a stress-test of flow control logic.
21552158
func runPingPongTest(t *testing.T, msgSize int) {
2156-
server, client, cancel := setUp(t, 0, 0, pingpong)
2159+
server, client, cancel := setUp(t, 0, pingpong)
21572160
defer cancel()
21582161
defer server.stop()
21592162
defer client.Close(fmt.Errorf("closed manually by test"))
@@ -2235,7 +2238,7 @@ func (s) TestHeaderTblSize(t *testing.T) {
22352238
}
22362239
}()
22372240

2238-
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
2241+
server, ct, cancel := setUp(t, 0, normal)
22392242
defer cancel()
22402243
defer ct.Close(fmt.Errorf("closed manually by test"))
22412244
defer server.stop()
@@ -2594,7 +2597,7 @@ func TestConnectionError_Unwrap(t *testing.T) {
25942597

25952598
func (s) TestPeerSetInServerContext(t *testing.T) {
25962599
// create client and server transports.
2597-
server, client, cancel := setUp(t, 0, math.MaxUint32, normal)
2600+
server, client, cancel := setUp(t, 0, normal)
25982601
defer cancel()
25992602
defer server.stop()
26002603
defer client.Close(fmt.Errorf("closed manually by test"))

server.go

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ type serverOptions struct {
173173
}
174174

175175
var defaultServerOptions = serverOptions{
176+
maxConcurrentStreams: math.MaxUint32,
176177
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
177178
maxSendMessageSize: defaultServerMaxSendMessageSize,
178179
connectionTimeout: 120 * time.Second,
@@ -398,6 +399,9 @@ func MaxSendMsgSize(m int) ServerOption {
398399
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
399400
// of concurrent streams to each ServerTransport.
400401
func MaxConcurrentStreams(n uint32) ServerOption {
402+
if n == 0 {
403+
n = math.MaxUint32
404+
}
401405
return newFuncServerOption(func(o *serverOptions) {
402406
o.maxConcurrentStreams = n
403407
})
@@ -2072,42 +2076,33 @@ func validateSendCompressor(name, clientCompressors string) error {
20722076
return fmt.Errorf("client does not support compressor %q", name)
20732077
}
20742078

2075-
type handlerQuota interface {
2076-
// acquire is called synchronously
2077-
acquire()
2078-
// release may be called asynchronously
2079-
release()
2080-
}
2081-
2082-
type atomicHandlerQuota struct {
2079+
// atomicSemaphore implements a blocking, counting semaphore. acquire should be
2080+
// called synchronously; release may be called asynchronously.
2081+
type atomicSemaphore struct {
20832082
n atomic.Int64
20842083
wait chan struct{}
20852084
}
20862085

2087-
func (q *atomicHandlerQuota) acquire() {
2086+
func (q *atomicSemaphore) acquire() {
20882087
if q.n.Add(-1) < 0 {
2089-
// Block until a release happens.
2088+
// We ran out of quota. Block until a release happens.
20902089
<-q.wait
20912090
}
20922091
}
20932092

2094-
func (q *atomicHandlerQuota) release() {
2095-
if q.n.Add(1) == 0 {
2093+
func (q *atomicSemaphore) release() {
2094+
// N.B. the "<= 0" check below should allow for this to work with multiple
2095+
// concurrent calls to acquire, but also note that with synchronous calls to
2096+
// acquire, as our system does, n will never be less than -1. There are
2097+
// fairness issues (queuing) to consider if this was to be generalized.
2098+
if q.n.Add(1) <= 0 {
20962099
// An acquire was waiting on us. Unblock it.
20972100
q.wait <- struct{}{}
20982101
}
20992102
}
21002103

2101-
type noHandlerQuota struct{}
2102-
2103-
func (noHandlerQuota) acquire() {}
2104-
func (noHandlerQuota) release() {}
2105-
2106-
func newHandlerQuota(n uint32) handlerQuota {
2107-
if n == 0 {
2108-
return noHandlerQuota{}
2109-
}
2110-
a := &atomicHandlerQuota{wait: make(chan struct{}, 1)}
2104+
func newHandlerQuota(n uint32) *atomicSemaphore {
2105+
a := &atomicSemaphore{wait: make(chan struct{}, 1)}
21112106
a.n.Store(int64(n))
21122107
return a
21132108
}

server_ext_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
*
3+
* Copyright 2023 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package grpc_test
20+
21+
import (
22+
"context"
23+
"io"
24+
"testing"
25+
"time"
26+
27+
"google.golang.org/grpc"
28+
"google.golang.org/grpc/internal/grpcsync"
29+
"google.golang.org/grpc/internal/stubserver"
30+
31+
testgrpc "google.golang.org/grpc/interop/grpc_testing"
32+
)
33+
34+
// TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server
35+
// handlers are active at one time.
36+
func (s) TestServer_MaxHandlers(t *testing.T) {
37+
started := make(chan struct{})
38+
blockCalls := grpcsync.NewEvent()
39+
40+
// This stub server does not properly respect the stream context, so it will
41+
// not exit when the context is canceled.
42+
ss := stubserver.StubServer{
43+
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
44+
started <- struct{}{}
45+
<-blockCalls.Done()
46+
return nil
47+
},
48+
}
49+
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil {
50+
t.Fatal("Error starting server:", err)
51+
}
52+
defer ss.Stop()
53+
54+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
55+
defer cancel()
56+
57+
// Start one RPC to the server.
58+
ctx1, cancel1 := context.WithCancel(ctx)
59+
_, err := ss.Client.FullDuplexCall(ctx1)
60+
if err != nil {
61+
t.Fatal("Error staring call:", err)
62+
}
63+
64+
// Wait for the handler to be invoked.
65+
select {
66+
case <-started:
67+
case <-ctx.Done():
68+
t.Fatalf("Timed out waiting for RPC to start on server.")
69+
}
70+
71+
// Cancel it on the client. The server handler will still be running.
72+
cancel1()
73+
74+
ctx2, cancel2 := context.WithCancel(ctx)
75+
defer cancel2()
76+
s, err := ss.Client.FullDuplexCall(ctx2)
77+
if err != nil {
78+
t.Fatal("Error staring call:", err)
79+
}
80+
81+
// After 100ms, allow the first call to unblock. That should allow the
82+
// second RPC to run and finish.
83+
select {
84+
case <-started:
85+
blockCalls.Fire()
86+
t.Fatalf("RPC started unexpectedly.")
87+
case <-time.After(100 * time.Millisecond):
88+
blockCalls.Fire()
89+
}
90+
91+
select {
92+
case <-started:
93+
case <-ctx.Done():
94+
t.Fatalf("Timed out waiting for second RPC to start on server.")
95+
}
96+
if _, err := s.Recv(); err != io.EOF {
97+
t.Fatal("Received unexpected RPC error:", err)
98+
}
99+
}

0 commit comments

Comments
 (0)