Skip to content

Commit ed9471c

Browse files
prasadborole1Neniel
authored andcommitted
[SPIRE Agent] Add size based backoff strategy for fetchSVIDs (spiffe#4279)
* [SPIRE Agent] Add size based backoff strategy for fetchSVIDs Signed-off-by: Prasad Borole <[email protected]> Signed-off-by: Neniel <[email protected]>
1 parent eff680a commit ed9471c

File tree

7 files changed

+195
-6
lines changed

7 files changed

+195
-6
lines changed

doc/spire_agent.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ This may be useful for templating configuration files, for example across differ
7171
| experimental | Description | Default |
7272
|:------------------|-----------------------------------------------------------------|-------------------------|
7373
| `named_pipe_name` | Pipe name to bind the SPIRE Agent API named pipe (Windows only) | \spire-agent\public\api |
74+
| `sync_interval` | Sync interval with SPIRE server with exponential backoff | 5 sec |
7475

7576
### Initial trust bundle configuration
7677

pkg/agent/common/backoff/backoff.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@ const (
1818
_noMaxElapsedTime = 0
1919
)
2020

21+
// Option allows customization of the backoff.ExponentialBackOff
22+
type Option interface {
23+
applyOption(*backoff.ExponentialBackOff)
24+
}
25+
2126
// NewBackoff returns a new backoff calculator ready for use. Generalizes all backoffs
2227
// to have the same behavioral pattern, though with different bounds based on given
2328
// interval.
24-
func NewBackoff(clk clock.Clock, interval time.Duration) BackOff {
29+
func NewBackoff(clk clock.Clock, interval time.Duration, opts ...Option) BackOff {
2530
b := &backoff.ExponentialBackOff{
2631
Clock: clk,
2732
InitialInterval: interval,
@@ -30,7 +35,23 @@ func NewBackoff(clk clock.Clock, interval time.Duration) BackOff {
3035
MaxInterval: _maxIntervalMultiple * interval,
3136
MaxElapsedTime: _noMaxElapsedTime,
3237
}
38+
for _, opt := range opts {
39+
opt.applyOption(b)
40+
}
3341
b.Reset()
3442

3543
return b
3644
}
45+
46+
// WithMaxInterval returns maxInterval backoff option to override the MaxInterval
47+
func WithMaxInterval(maxInterval time.Duration) Option {
48+
return backoffOption{maxInterval: maxInterval}
49+
}
50+
51+
type backoffOption struct {
52+
maxInterval time.Duration
53+
}
54+
55+
func (b backoffOption) applyOption(bo *backoff.ExponentialBackOff) {
56+
bo.MaxInterval = b.maxInterval
57+
}

pkg/agent/common/backoff/backoff_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,28 @@ func TestBackOff(t *testing.T) {
3030
inRange(t, expectedResults[0], b)
3131
}
3232

33+
func TestBackOffWithMaxInterval(t *testing.T) {
34+
testInitialInterval := 6400 * time.Millisecond
35+
36+
mockClk := clock.NewMock(t)
37+
b := NewBackoff(mockClk, testInitialInterval, WithMaxInterval(33000*time.Millisecond))
38+
39+
expectedResults := []time.Duration{}
40+
for _, d := range []int{6400, 9600, 14400, 21600, 32400, 33000, 33000} {
41+
expectedResults = append(expectedResults, time.Duration(d)*time.Millisecond)
42+
}
43+
44+
for _, expected := range expectedResults {
45+
// Assert that the next backoff falls in the expected range.
46+
inRange(t, expected, b)
47+
mockClk.Add(expected)
48+
}
49+
50+
// assert reset works as expected
51+
b.Reset()
52+
inRange(t, expectedResults[0], b)
53+
}
54+
3355
func inRange(t *testing.T, expected time.Duration, b BackOff) {
3456
var minInterval = expected - time.Duration(_jitter*float64(expected))
3557
var maxInterval = expected + time.Duration(_jitter*float64(expected))
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package backoff
2+
3+
// SizeLimitedBackOff defines interface for implementing a size based backoff for requests which
4+
// contain number of records to be processed by server.
5+
type SizeLimitedBackOff interface {
6+
// NextBackOff returns the duration to wait before retrying the operation,
7+
// or backoff.
8+
NextBackOff() int
9+
10+
// Success indicates the backoff implementation that previous request succeeded
11+
// so that it can adjust backoff accordingly for next request.
12+
Success()
13+
14+
// Failure indicates the backoff implementation that previous request failed
15+
// so that it can adjust backoff accordingly for next request.
16+
Failure()
17+
18+
// Reset to initial state.
19+
Reset()
20+
}
21+
22+
type sizeLimitedBackOff struct {
23+
currentSize int
24+
maxSize int
25+
}
26+
27+
var _ SizeLimitedBackOff = (*sizeLimitedBackOff)(nil)
28+
29+
func (r *sizeLimitedBackOff) NextBackOff() int {
30+
return r.currentSize
31+
}
32+
33+
func (r *sizeLimitedBackOff) Success() {
34+
newSize := r.currentSize * 2
35+
if newSize > r.maxSize {
36+
newSize = r.maxSize
37+
}
38+
r.currentSize = newSize
39+
}
40+
41+
func (r *sizeLimitedBackOff) Failure() {
42+
newSize := r.currentSize / 2
43+
if newSize < 1 {
44+
newSize = 1
45+
}
46+
r.currentSize = newSize
47+
}
48+
49+
func (r *sizeLimitedBackOff) Reset() {
50+
r.currentSize = r.maxSize
51+
}
52+
53+
// NewSizeLimitedBackOff returns a new SizeLimitedBackOff with provided maxRequestSize and lowest request size of 1.
54+
// On Failure the size gets reduced by half and on Success size gets doubled
55+
func NewSizeLimitedBackOff(maxRequestSize int) SizeLimitedBackOff {
56+
b := &sizeLimitedBackOff{
57+
maxSize: maxRequestSize,
58+
}
59+
b.Reset()
60+
61+
return b
62+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package backoff
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestRequestSizeBackOff(t *testing.T) {
10+
t.Run("max request size does not go above configured maximum", func(t *testing.T) {
11+
maxRequestSize := 1000
12+
b := NewSizeLimitedBackOff(maxRequestSize)
13+
14+
// Initial backoff value should be equal to the maxRequestSize
15+
assert.Equal(t, maxRequestSize, b.NextBackOff())
16+
17+
// After multiple successes, the backoff value should cap at maxRequestSize
18+
b.Success()
19+
assert.Equal(t, maxRequestSize, b.NextBackOff())
20+
b.Success()
21+
assert.Equal(t, maxRequestSize, b.NextBackOff())
22+
})
23+
t.Run("min request size does not go below 1", func(t *testing.T) {
24+
// validate lower limit
25+
maxRequestSize := 5
26+
b := NewSizeLimitedBackOff(maxRequestSize)
27+
assert.Equal(t, maxRequestSize, b.NextBackOff())
28+
29+
b.Failure()
30+
assert.Equal(t, 2, b.NextBackOff())
31+
b.Failure()
32+
assert.Equal(t, 1, b.NextBackOff())
33+
34+
// backoff value should not go below 1
35+
b.Failure()
36+
assert.Equal(t, 1, b.NextBackOff())
37+
})
38+
t.Run("backoff updates on Failure, Success and Reset", func(t *testing.T) {
39+
maxRequestSize := 1000
40+
b := NewSizeLimitedBackOff(maxRequestSize)
41+
// After a failure, the backoff value should be halved
42+
b.Failure()
43+
assert.Equal(t, maxRequestSize/2, b.NextBackOff())
44+
45+
// After multiple failures, the backoff value should keep halving
46+
b.Failure()
47+
b.Failure()
48+
assert.Equal(t, maxRequestSize/8, b.NextBackOff())
49+
50+
// After success backoff value should keep doubling
51+
b.Success()
52+
assert.Equal(t, maxRequestSize/4, b.NextBackOff())
53+
b.Success()
54+
assert.Equal(t, maxRequestSize/2, b.NextBackOff())
55+
56+
// Reset should set the backoff value back to the initial maxRequestSize
57+
b.Reset()
58+
assert.Equal(t, maxRequestSize, b.NextBackOff())
59+
})
60+
}

pkg/agent/manager/manager.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,18 @@ import (
2222
"github.com/spiffe/spire/pkg/common/rotationutil"
2323
"github.com/spiffe/spire/pkg/common/telemetry"
2424
"github.com/spiffe/spire/pkg/common/util"
25+
"github.com/spiffe/spire/pkg/server/api/limits"
2526
"github.com/spiffe/spire/proto/spire/common"
2627
)
2728

29+
const (
30+
maxSVIDSyncInterval = 4 * time.Minute
31+
// for sync interval of 5 sec this will result in max of 4 mins of backoff
32+
synchronizeMaxIntervalMultiple = 48
33+
// for larger sync interval set max interval as 8 mins
34+
synchronizeMaxInterval = 8 * time.Minute
35+
)
36+
2837
// Manager provides cache management functionalities for agents.
2938
type Manager interface {
3039
// Initialize initializes the manager.
@@ -132,6 +141,8 @@ type manager struct {
132141
// fetch attempt
133142
synchronizeBackoff backoff.BackOff
134143
svidSyncBackoff backoff.BackOff
144+
// csrSizeLimitedBackoff backs off the number of csrs if error is returned on fetch svid attempt
145+
csrSizeLimitedBackoff backoff.SizeLimitedBackOff
135146

136147
client client.Client
137148

@@ -148,8 +159,13 @@ func (m *manager) Initialize(ctx context.Context) error {
148159
m.storeSVID(m.svid.State().SVID, m.svid.State().Reattestable)
149160
m.storeBundle(m.cache.Bundle())
150161

151-
m.synchronizeBackoff = backoff.NewBackoff(m.clk, m.c.SyncInterval)
152-
m.svidSyncBackoff = backoff.NewBackoff(m.clk, cache.SVIDSyncInterval)
162+
synchronizeBackoffMaxInterval := synchronizeMaxIntervalMultiple * m.c.SyncInterval
163+
if synchronizeBackoffMaxInterval > synchronizeMaxInterval {
164+
synchronizeBackoffMaxInterval = synchronizeMaxInterval
165+
}
166+
m.synchronizeBackoff = backoff.NewBackoff(m.clk, m.c.SyncInterval, backoff.WithMaxInterval(synchronizeBackoffMaxInterval))
167+
m.svidSyncBackoff = backoff.NewBackoff(m.clk, cache.SVIDSyncInterval, backoff.WithMaxInterval(maxSVIDSyncInterval))
168+
m.csrSizeLimitedBackoff = backoff.NewSizeLimitedBackOff(limits.SignLimitPerIP)
153169

154170
err := m.synchronize(ctx)
155171
if nodeutil.ShouldAgentReattest(err) {

pkg/agent/manager/sync.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/spiffe/spire/pkg/common/telemetry"
1616
telemetry_agent "github.com/spiffe/spire/pkg/common/telemetry/agent"
1717
"github.com/spiffe/spire/pkg/common/util"
18-
"github.com/spiffe/spire/pkg/server/api/limits"
1918
"github.com/spiffe/spire/proto/spire/common"
2019
)
2120

@@ -116,14 +115,15 @@ func (m *manager) updateSVIDs(ctx context.Context, log logrus.FieldLogger, c SVI
116115
staleEntries := c.GetStaleEntries()
117116
if len(staleEntries) > 0 {
118117
var csrs []csrRequest
118+
sizeLimit := m.csrSizeLimitedBackoff.NextBackOff()
119119
log.WithFields(logrus.Fields{
120120
telemetry.Count: len(staleEntries),
121-
telemetry.Limit: limits.SignLimitPerIP,
121+
telemetry.Limit: sizeLimit,
122122
}).Debug("Renewing stale entries")
123123

124124
for _, entry := range staleEntries {
125125
// we've exceeded the CSR limit, don't make any more CSRs
126-
if len(csrs) >= limits.SignLimitPerIP {
126+
if len(csrs) >= sizeLimit {
127127
break
128128
}
129129

@@ -148,6 +148,11 @@ func (m *manager) fetchSVIDs(ctx context.Context, csrs []csrRequest) (_ *cache.U
148148
// Put all the CSRs in an array to make just one call with all the CSRs.
149149
counter := telemetry_agent.StartManagerFetchSVIDsUpdatesCall(m.c.Metrics)
150150
defer counter.Done(&err)
151+
defer func() {
152+
if err == nil {
153+
m.csrSizeLimitedBackoff.Success()
154+
}
155+
}()
151156

152157
csrsIn := make(map[string][]byte)
153158

@@ -180,6 +185,8 @@ func (m *manager) fetchSVIDs(ctx context.Context, csrs []csrRequest) (_ *cache.U
180185

181186
svidsOut, err := m.client.NewX509SVIDs(ctx, csrsIn)
182187
if err != nil {
188+
// Reduce csr size for next invocation
189+
m.csrSizeLimitedBackoff.Failure()
183190
return nil, err
184191
}
185192

0 commit comments

Comments
 (0)