Skip to content

Commit 2d72ea1

Browse files
authored
Merge pull request #20290 from serathius/watch-restore-future-release-3.5
[release-3.5] Avoid lowering revision of watchers in the future after restore
2 parents b247e0a + fe30a6e commit 2d72ea1

File tree

2 files changed

+99
-84
lines changed

2 files changed

+99
-84
lines changed

server/mvcc/watchable_store.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ var (
3939

4040
// maxWatchersPerSync is the number of watchers to sync in a single batch
4141
maxWatchersPerSync = 512
42+
43+
// maxResyncPeriod is the period of executing resync.
44+
watchResyncPeriod = 100 * time.Millisecond
4245
)
4346

4447
type watchable interface {
@@ -228,7 +231,7 @@ func (s *watchableStore) syncWatchersLoop() {
228231
}
229232
syncDuration := time.Since(st)
230233

231-
waitDuration := 100 * time.Millisecond
234+
waitDuration := watchResyncPeriod
232235
// more work pending?
233236
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
234237
// be fair to other store operations by yielding time taken
@@ -374,7 +377,7 @@ func (s *watchableStore) syncWatchers() int {
374377
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
375378
continue
376379
}
377-
w.minRev = curRev + 1
380+
w.minRev = max(curRev+1, w.minRev)
378381

379382
eb, ok := wb[w]
380383
if !ok {

server/mvcc/watchable_store_test.go

Lines changed: 94 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/google/go-cmp/cmp"
2728
"github.com/prometheus/client_golang/prometheus/testutil"
2829
"github.com/stretchr/testify/require"
2930
"go.uber.org/zap"
@@ -538,104 +539,115 @@ func TestWatchFutureRev(t *testing.T) {
538539
}
539540

540541
func TestWatchRestore(t *testing.T) {
541-
test := func(delay time.Duration) func(t *testing.T) {
542-
return func(t *testing.T) {
543-
b, tmpPath := betesting.NewDefaultTmpBackend(t)
544-
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
545-
defer cleanup(s, b, tmpPath)
546-
547-
testKey := []byte("foo")
548-
testValue := []byte("bar")
549-
w := s.NewWatchStream()
550-
defer w.Close()
551-
w.Watch(0, testKey, nil, 1)
542+
resyncDelay := watchResyncPeriod * 3 / 2
552543

553-
time.Sleep(delay)
554-
wantRev := s.Put(testKey, testValue, lease.NoLease)
544+
t.Run("NoResync", func(t *testing.T) {
545+
testWatchRestore(t, 0, 0)
546+
})
547+
t.Run("ResyncBefore", func(t *testing.T) {
548+
testWatchRestore(t, resyncDelay, 0)
549+
})
550+
t.Run("ResyncAfter", func(t *testing.T) {
551+
testWatchRestore(t, 0, resyncDelay)
552+
})
555553

556-
s.Restore(b)
557-
events := readEventsForSecond(w.Chan())
558-
if len(events) != 1 {
559-
t.Errorf("Expected only one event, got %d", len(events))
560-
}
561-
if events[0].Kv.ModRevision != wantRev {
562-
t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev)
563-
}
554+
t.Run("ResyncBeforeAndAfter", func(t *testing.T) {
555+
testWatchRestore(t, resyncDelay, resyncDelay)
556+
})
557+
}
564558

565-
}
559+
func testWatchRestore(t *testing.T, delayBeforeRestore, delayAfterRestore time.Duration) {
560+
b, tmpPath := betesting.NewDefaultTmpBackend(t)
561+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
562+
defer cleanup(s, b, tmpPath)
563+
564+
testKey := []byte("foo")
565+
testValue := []byte("bar")
566+
567+
tcs := []struct {
568+
name string
569+
startRevision int64
570+
wantEvents []mvccpb.Event
571+
}{
572+
{
573+
name: "zero revision",
574+
startRevision: 0,
575+
wantEvents: []mvccpb.Event{
576+
{Type: mvccpb.PUT, Kv: &mvccpb.KeyValue{Key: testKey, Value: testValue, CreateRevision: 2, ModRevision: 2, Version: 1}},
577+
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: testKey, ModRevision: 3}},
578+
},
579+
},
580+
{
581+
name: "revsion before first write",
582+
startRevision: 1,
583+
wantEvents: []mvccpb.Event{
584+
{Type: mvccpb.PUT, Kv: &mvccpb.KeyValue{Key: testKey, Value: testValue, CreateRevision: 2, ModRevision: 2, Version: 1}},
585+
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: testKey, ModRevision: 3}},
586+
},
587+
},
588+
{
589+
name: "revision of first write",
590+
startRevision: 2,
591+
wantEvents: []mvccpb.Event{
592+
{Type: mvccpb.PUT, Kv: &mvccpb.KeyValue{Key: testKey, Value: testValue, CreateRevision: 2, ModRevision: 2, Version: 1}},
593+
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: testKey, ModRevision: 3}},
594+
},
595+
},
596+
{
597+
name: "current revision",
598+
startRevision: 3,
599+
wantEvents: []mvccpb.Event{
600+
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: testKey, ModRevision: 3}},
601+
},
602+
},
603+
{
604+
name: "future revision",
605+
startRevision: 4,
606+
wantEvents: []mvccpb.Event{},
607+
},
608+
}
609+
watchers := []WatchStream{}
610+
for i, tc := range tcs {
611+
w := s.NewWatchStream()
612+
defer w.Close()
613+
watchers = append(watchers, w)
614+
w.Watch(WatchID(i+1), testKey, nil, tc.startRevision)
566615
}
567616

568-
t.Run("Normal", test(0))
569-
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
617+
s.Put(testKey, testValue, lease.NoLease)
618+
time.Sleep(delayBeforeRestore)
619+
s.Restore(b)
620+
time.Sleep(delayAfterRestore)
621+
s.DeleteRange(testKey, nil)
622+
623+
for i, tc := range tcs {
624+
t.Run(tc.name, func(t *testing.T) {
625+
events := readEventsForSecond(t, watchers[i].Chan())
626+
if diff := cmp.Diff(tc.wantEvents, events); diff != "" {
627+
t.Errorf("unexpected events (-want +got):\n%s", diff)
628+
}
629+
})
630+
}
570631
}
571632

572-
func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
633+
func readEventsForSecond(t *testing.T, ws <-chan WatchResponse) []mvccpb.Event {
634+
events := []mvccpb.Event{}
635+
deadline := time.After(time.Second)
573636
for {
574637
select {
575638
case resp := <-ws:
639+
if len(resp.Events) == 0 {
640+
t.Fatalf("Events should never be empty, resp: %+v", resp)
641+
}
576642
events = append(events, resp.Events...)
577-
case <-time.After(time.Second):
643+
case <-deadline:
644+
return events
645+
case <-time.After(watchResyncPeriod * 3 / 2):
578646
return events
579647
}
580648
}
581649
}
582650

583-
// TestWatchRestoreSyncedWatcher tests such a case that:
584-
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
585-
// 2. watcher with a future revision is added to "synced" watcher group
586-
// 3. restore/overwrite storage with snapshot of a higher lasat revision
587-
// 4. restore operation moves "synced" to "unsynced" watcher group
588-
// 5. choose the watcher from step 1, without panic
589-
func TestWatchRestoreSyncedWatcher(t *testing.T) {
590-
b1, b1Path := betesting.NewDefaultTmpBackend(t)
591-
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, StoreConfig{})
592-
defer cleanup(s1, b1, b1Path)
593-
594-
b2, b2Path := betesting.NewDefaultTmpBackend(t)
595-
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, StoreConfig{})
596-
defer cleanup(s2, b2, b2Path)
597-
598-
testKey, testValue := []byte("foo"), []byte("bar")
599-
rev := s1.Put(testKey, testValue, lease.NoLease)
600-
startRev := rev + 2
601-
602-
// create a watcher with a future revision
603-
// add to "synced" watcher group (startRev > s.store.currentRev)
604-
w1 := s1.NewWatchStream()
605-
w1.Watch(0, testKey, nil, startRev)
606-
607-
// make "s2" ends up with a higher last revision
608-
s2.Put(testKey, testValue, lease.NoLease)
609-
s2.Put(testKey, testValue, lease.NoLease)
610-
611-
// overwrite storage with higher revisions
612-
if err := s1.Restore(b2); err != nil {
613-
t.Fatal(err)
614-
}
615-
616-
// wait for next "syncWatchersLoop" iteration
617-
// and the unsynced watcher should be chosen
618-
time.Sleep(2 * time.Second)
619-
620-
// trigger events for "startRev"
621-
s1.Put(testKey, testValue, lease.NoLease)
622-
623-
select {
624-
case resp := <-w1.Chan():
625-
if resp.Revision != startRev {
626-
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
627-
}
628-
if len(resp.Events) != 1 {
629-
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
630-
}
631-
if resp.Events[0].Kv.ModRevision != startRev {
632-
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
633-
}
634-
case <-time.After(time.Second):
635-
t.Fatal("failed to receive event in 1 second")
636-
}
637-
}
638-
639651
// TestWatchBatchUnsynced tests batching on unsynced watchers
640652
func TestWatchBatchUnsynced(t *testing.T) {
641653
b, tmpPath := betesting.NewDefaultTmpBackend(t)

0 commit comments

Comments
 (0)