Skip to content

Commit 2332fff

Browse files
committed
Timeout if watch doesn't collect all events within 1 second after writes finish and max revision is provided
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 6c443c2 commit 2332fff

File tree

1 file changed

+18
-6
lines changed

1 file changed

+18
-6
lines changed

tests/robustness/client/watch.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,22 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"time"
2122

2223
"go.uber.org/zap"
2324
"golang.org/x/sync/errgroup"
2425

2526
"go.etcd.io/etcd/tests/v3/robustness/report"
2627
)
2728

29+
var watchEventCollectionTimeout = time.Second
30+
2831
func CollectClusterWatchEvents(ctx context.Context, lg *zap.Logger, endpoints []string, maxRevisionChan <-chan int64, cfg WatchConfig, clientSet *ClientSet) error {
2932
var g errgroup.Group
3033
reports := make([]report.ClientReport, len(endpoints))
3134
memberMaxRevisionChans := make([]chan int64, len(endpoints))
35+
ctx, cancel := context.WithCancel(ctx)
36+
defer cancel()
3237
for i, endpoint := range endpoints {
3338
memberMaxRevisionChan := make(chan int64, 1)
3439
memberMaxRevisionChans[i] = memberMaxRevisionChan
@@ -49,6 +54,8 @@ func CollectClusterWatchEvents(ctx context.Context, lg *zap.Logger, endpoints []
4954
for _, memberChan := range memberMaxRevisionChans {
5055
memberChan <- maxRevision
5156
}
57+
time.Sleep(watchEventCollectionTimeout)
58+
cancel()
5259
return nil
5360
})
5461
return g.Wait()
@@ -62,19 +69,27 @@ type WatchConfig struct {
6269
func watchUntilRevision(ctx context.Context, lg *zap.Logger, c *RecordingClient, maxRevisionChan <-chan int64, cfg WatchConfig) error {
6370
var maxRevision int64
6471
var lastRevision int64 = 1
65-
var closing bool
6672
ctx, cancel := context.WithCancel(ctx)
6773
defer cancel()
6874
resetWatch:
6975
for {
70-
if closing {
76+
select {
77+
case <-ctx.Done():
78+
select {
79+
case revision, ok := <-maxRevisionChan:
80+
if ok {
81+
maxRevision = revision
82+
}
83+
default:
84+
}
7185
if maxRevision == 0 {
7286
return errors.New("Client didn't collect all events, max revision not set")
7387
}
7488
if lastRevision < maxRevision {
75-
return fmt.Errorf("Client didn't collect all events, got: %d, expected: %d", lastRevision, maxRevision)
89+
return fmt.Errorf("Client didn't collect all events, revision got: %d, expected: %d", lastRevision, maxRevision)
7690
}
7791
return nil
92+
default:
7893
}
7994
watch := c.Watch(ctx, "", lastRevision+1, true, true, false)
8095
for {
@@ -83,13 +98,11 @@ resetWatch:
8398
if ok {
8499
maxRevision = revision
85100
if lastRevision >= maxRevision {
86-
closing = true
87101
cancel()
88102
}
89103
} else {
90104
// Only cancel if maxRevision was never set.
91105
if maxRevision == 0 {
92-
closing = true
93106
cancel()
94107
}
95108
}
@@ -115,7 +128,6 @@ resetWatch:
115128
lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision
116129
}
117130
if maxRevision != 0 && lastRevision >= maxRevision {
118-
closing = true
119131
cancel()
120132
}
121133
}

0 commit comments

Comments
 (0)