Skip to content

Commit d1d3dc2

Browse files
authored
Merge pull request #20245 from serathius/skip-future-progress-notification-release-3.5
[release-3.5] Skip sending progress notification for watch with starting revision in the future
2 parents 27d9b9d + 2d5a9b9 commit d1d3dc2

File tree

3 files changed

+95
-33
lines changed

3 files changed

+95
-33
lines changed

server/etcdserver/api/v3rpc/watch.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,7 @@ func (sws *serverWatchStream) recvLoop() error {
303303

304304
filters := FiltersFromRequest(creq)
305305

306-
wsrev := sws.watchStream.Rev()
307-
rev := creq.StartRevision
308-
if rev == 0 {
309-
rev = wsrev + 1
310-
}
311-
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
306+
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, creq.StartRevision, filters...)
312307
if err == nil {
313308
sws.mu.Lock()
314309
if creq.ProgressNotify {
@@ -326,7 +321,7 @@ func (sws *serverWatchStream) recvLoop() error {
326321
}
327322

328323
wr := &pb.WatchResponse{
329-
Header: sws.newResponseHeader(wsrev),
324+
Header: sws.newResponseHeader(sws.watchStream.Rev()),
330325
WatchId: int64(id),
331326
Created: true,
332327
Canceled: err != nil,

server/mvcc/watchable_store.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,13 @@ func (s *watchableStore) NewWatchStream() WatchStream {
117117

118118
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
119119
wa := &watcher{
120-
key: key,
121-
end: end,
122-
minRev: startRev,
123-
id: id,
124-
ch: ch,
125-
fcs: fcs,
120+
key: key,
121+
end: end,
122+
startRev: startRev,
123+
minRev: startRev,
124+
id: id,
125+
ch: ch,
126+
fcs: fcs,
126127
}
127128

128129
s.mu.Lock()
@@ -495,11 +496,15 @@ func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseW
495496
s.mu.RLock()
496497
defer s.mu.RUnlock()
497498

499+
rev := s.rev()
498500
// Any watcher unsynced?
499501
for _, w := range watchers {
500502
if _, ok := s.synced.watchers[w]; !ok {
501503
return false
502504
}
505+
if rev < w.startRev {
506+
return false
507+
}
503508
}
504509

505510
// If all watchers are synchronised, send out progress
@@ -508,7 +513,7 @@ func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseW
508513
// notification will be broadcasted client-side if required
509514
// (see dispatchEvent in client/v3/watch.go)
510515
for _, w := range watchers {
511-
w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()})
516+
w.send(WatchResponse{WatchID: responseWatchID, Revision: rev})
512517
return true
513518
}
514519
return true
@@ -535,6 +540,7 @@ type watcher struct {
535540
// except when the watcher were to be moved from "synced" watcher group
536541
restore bool
537542

543+
startRev int64
538544
// minRev is the minimum revision update the watcher will accept
539545
minRev int64
540546
id WatchID

server/mvcc/watcher_test.go

Lines changed: 80 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -288,9 +288,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) {
288288
}
289289
}
290290

291-
// TestWatcherRequestProgress ensures synced watcher can correctly
292-
// report its correct progress.
293-
func TestWatcherRequestProgress(t *testing.T) {
291+
func TestWatcherRequestProgressBadId(t *testing.T) {
294292
b, tmpPath := betesting.NewDefaultTmpBackend(t)
295293

296294
// manually create watchableStore instead of newWatchableStore
@@ -302,14 +300,12 @@ func TestWatcherRequestProgress(t *testing.T) {
302300
unsynced: newWatcherGroup(),
303301
synced: newWatcherGroup(),
304302
}
305-
306303
defer func() {
307304
s.store.Close()
308305
os.Remove(tmpPath)
309306
}()
310307

311308
testKey := []byte("foo")
312-
notTestKey := []byte("bad")
313309
testValue := []byte("bar")
314310
s.Put(testKey, testValue, lease.NoLease)
315311

@@ -322,26 +318,91 @@ func TestWatcherRequestProgress(t *testing.T) {
322318
t.Fatalf("unexpected %+v", resp)
323319
default:
324320
}
321+
}
325322

326-
id, _ := w.Watch(0, notTestKey, nil, 1)
327-
w.RequestProgress(id)
328-
select {
329-
case resp := <-w.Chan():
330-
t.Fatalf("unexpected %+v", resp)
331-
default:
323+
func TestWatcherRequestProgress(t *testing.T) {
324+
testKey := []byte("foo")
325+
notTestKey := []byte("bad")
326+
testValue := []byte("bar")
327+
tcs := []struct {
328+
name string
329+
startRev int64
330+
expectProgressBeforeSync bool
331+
expectProgressAfterSync bool
332+
}{
333+
{
334+
name: "Zero revision",
335+
startRev: 0,
336+
expectProgressBeforeSync: true,
337+
expectProgressAfterSync: true,
338+
},
339+
{
340+
name: "Old revision",
341+
startRev: 1,
342+
expectProgressAfterSync: true,
343+
},
344+
{
345+
name: "Current revision",
346+
startRev: 2,
347+
expectProgressAfterSync: true,
348+
},
349+
{
350+
name: "Current revision plus one",
351+
startRev: 3,
352+
},
353+
{
354+
name: "Current revision plus two",
355+
startRev: 4,
356+
},
332357
}
358+
for _, tc := range tcs {
359+
t.Run(tc.name, func(t *testing.T) {
360+
b, tmpPath := betesting.NewDefaultTmpBackend(t)
361+
362+
// manually create watchableStore instead of newWatchableStore
363+
// because newWatchableStore automatically calls syncWatchers
364+
// method to sync watchers in unsynced map. We want to keep watchers
365+
// in unsynced to test if syncWatchers works as expected.
366+
s := &watchableStore{
367+
store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}),
368+
unsynced: newWatcherGroup(),
369+
synced: newWatcherGroup(),
370+
}
333371

334-
s.syncWatchers()
372+
defer func() {
373+
s.store.Close()
374+
os.Remove(tmpPath)
375+
}()
376+
377+
s.Put(testKey, testValue, lease.NoLease)
335378

336-
w.RequestProgress(id)
337-
wrs := WatchResponse{WatchID: id, Revision: 2}
379+
w := s.NewWatchStream()
380+
381+
id, _ := w.Watch(0, notTestKey, nil, tc.startRev)
382+
w.RequestProgress(id)
383+
asssertProgressSent(t, w, id, tc.expectProgressBeforeSync)
384+
s.syncWatchers()
385+
w.RequestProgress(id)
386+
asssertProgressSent(t, w, id, tc.expectProgressAfterSync)
387+
})
388+
}
389+
}
390+
391+
func asssertProgressSent(t *testing.T, stream WatchStream, id WatchID, expectProgress bool) {
338392
select {
339-
case resp := <-w.Chan():
340-
if !reflect.DeepEqual(resp, wrs) {
341-
t.Fatalf("got %+v, expect %+v", resp, wrs)
393+
case resp := <-stream.Chan():
394+
if expectProgress {
395+
wrs := WatchResponse{WatchID: id, Revision: 2}
396+
if !reflect.DeepEqual(resp, wrs) {
397+
t.Fatalf("got %+v, expect %+v", resp, wrs)
398+
}
399+
} else {
400+
t.Fatalf("unexpected response %+v", resp)
401+
}
402+
default:
403+
if expectProgress {
404+
t.Fatalf("failed to receive progress")
342405
}
343-
case <-time.After(time.Second):
344-
t.Fatal("failed to receive progress")
345406
}
346407
}
347408

0 commit comments

Comments
 (0)