Skip to content

Commit 6c443c2

Browse files
committed
cache: remove early compaction check from Cache.Watch
Signed-off-by: Peter Chang <[email protected]>
1 parent 431a65a commit 6c443c2

File tree

3 files changed

+48
-30
lines changed

3 files changed

+48
-30
lines changed

cache/cache.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
26+
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
2627
clientv3 "go.etcd.io/etcd/client/v3"
2728
)
2829

@@ -95,18 +96,6 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
9596
op := clientv3.OpWatch(key, opts...)
9697
startRev := op.Rev()
9798

98-
if startRev != 0 {
99-
if oldest := c.demux.PeekOldest(); oldest != 0 && startRev < oldest {
100-
ch := make(chan clientv3.WatchResponse, 1)
101-
ch <- clientv3.WatchResponse{
102-
Canceled: true,
103-
CompactRevision: startRev,
104-
}
105-
close(ch)
106-
return ch
107-
}
108-
}
109-
11099
pred, err := c.validateWatch(key, op)
111100
if err != nil {
112101
ch := make(chan clientv3.WatchResponse, 1)
@@ -132,6 +121,13 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
132121
return
133122
case events, ok := <-w.eventQueue:
134123
if !ok {
124+
if w.cancelResp != nil {
125+
select {
126+
case <-ctx.Done():
127+
case <-c.internalCtx.Done():
128+
case responseChan <- *w.cancelResp:
129+
}
130+
}
135131
return
136132
}
137133
select {
@@ -299,7 +295,11 @@ func (c *Cache) watchEvents(watchCh clientv3.WatchChan, applyErr <-chan error, r
299295
readyOnce.Do(func() { c.ready.Set() })
300296
if err := resp.Err(); err != nil {
301297
c.ready.Reset()
302-
c.demux.Purge()
298+
if errors.Is(err, rpctypes.ErrCompacted) || resp.CompactRevision > 0 {
299+
c.demux.Compact(resp.CompactRevision)
300+
} else {
301+
c.demux.Purge()
302+
}
303303
return err
304304
}
305305
c.demux.Broadcast(resp.Events)

cache/demux.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (d *demux) Broadcast(events []*clientv3.Event) {
151151
}
152152
}
153153

154-
// Purge is called when etcd compaction invalidates our cached history, so clients should resubscribe.
154+
// Purge stops all watchers and rebase history on watch errors
155155
func (d *demux) Purge() {
156156
d.mu.Lock()
157157
defer d.mu.Unlock()
@@ -166,6 +166,21 @@ func (d *demux) Purge() {
166166
d.laggingWatchers = make(map[*watcher]int64)
167167
}
168168

169+
// Compact is called when etcd reports a compaction at compactRev to rebase history;
170+
// it keeps provably-too-old watchers for later cancellation, stops others, and clients should resubscribe.
171+
func (d *demux) Compact(compactRev int64) {
172+
d.mu.Lock()
173+
defer d.mu.Unlock()
174+
d.history.RebaseHistory()
175+
176+
for w, next := range d.activeWatchers {
177+
if next != 0 && next <= compactRev {
178+
delete(d.activeWatchers, w)
179+
d.laggingWatchers[w] = next
180+
}
181+
}
182+
}
183+
169184
func (d *demux) resyncLaggingWatchers() {
170185
d.mu.Lock()
171186
defer d.mu.Unlock()
@@ -177,7 +192,7 @@ func (d *demux) resyncLaggingWatchers() {
177192

178193
for w, nextRev := range d.laggingWatchers {
179194
if nextRev < oldestRev {
180-
w.Stop()
195+
w.Compact(nextRev)
181196
delete(d.laggingWatchers, w)
182197
continue
183198
}
@@ -200,9 +215,3 @@ func (d *demux) resyncLaggingWatchers() {
200215
}
201216
}
202217
}
203-
204-
func (d *demux) PeekOldest() int64 {
205-
d.mu.RLock()
206-
defer d.mu.RUnlock()
207-
return d.history.PeekOldest()
208-
}

cache/watcher.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,24 @@
1515
package cache
1616

1717
import (
18-
"sync/atomic"
18+
"sync"
1919

20+
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
2021
clientv3 "go.etcd.io/etcd/client/v3"
2122
)
2223

2324
// watcher holds one client’s buffered stream of events.
2425
type watcher struct {
2526
eventQueue chan []*clientv3.Event
27+
cancelResp *clientv3.WatchResponse
2628
keyPred KeyPredicate
27-
stopped int32
28-
done chan struct{} // closed together with Stop()
29+
stopOnce sync.Once
2930
}
3031

3132
func newWatcher(bufSize int, pred KeyPredicate) *watcher {
3233
return &watcher{
3334
eventQueue: make(chan []*clientv3.Event, bufSize),
3435
keyPred: pred,
35-
done: make(chan struct{}),
3636
}
3737
}
3838

@@ -59,12 +59,21 @@ func (w *watcher) enqueueEvent(eventBatch []*clientv3.Event) bool {
5959
}
6060
}
6161

62+
func (w *watcher) Compact(compactRev int64) {
63+
resp := &clientv3.WatchResponse{
64+
Canceled: true,
65+
CompactRevision: compactRev,
66+
CancelReason: rpctypes.ErrCompacted.Error(),
67+
}
68+
w.stopOnce.Do(func() {
69+
w.cancelResp = resp
70+
close(w.eventQueue)
71+
})
72+
}
73+
6274
// Stop closes the event channel atomically.
6375
func (w *watcher) Stop() {
64-
if atomic.CompareAndSwapInt32(&w.stopped, 0, 1) {
76+
w.stopOnce.Do(func() {
6577
close(w.eventQueue)
66-
close(w.done)
67-
}
78+
})
6879
}
69-
70-
func (w *watcher) Done() <-chan struct{} { return w.done }

0 commit comments

Comments
 (0)