Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ func (c *Coordinator) Migrate(
options,
store,
backoffFactory,
3, // try enrollment for 3 times and fail
)
if err != nil {
restoreErr := RestoreConfig()
Expand Down
23 changes: 18 additions & 5 deletions internal/pkg/agent/application/enroll/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
)

const (
EnrollBackoffInit = time.Second * 5
EnrollBackoffMax = time.Minute * 10
EnrollBackoffInit = time.Second * 5
EnrollBackoffMax = time.Minute * 10
EnrollInfiniteAttempts = -1

maxRetriesstoreAgentInfo = 5
defaultFleetServerHost = "0.0.0.0"
Expand All @@ -57,6 +58,7 @@ func EnrollWithBackoff(
options EnrollOptions,
configStore saver,
backoffFactory func(done <-chan struct{}) backoff.Backoff,
maxAttempts int,
) error {
if backoffFactory == nil {
backoffFactory = func(done <-chan struct{}) backoff.Backoff {
Expand Down Expand Up @@ -92,26 +94,37 @@ func EnrollWithBackoff(
signal := make(chan struct{})
defer close(signal)
backExp := backoffFactory(signal)
enrollFn := func() error {
return enroll(ctx, log, persistentConfig, client, options, configStore)
}
err = retryEnroll(err, maxAttempts, log, enrollFn, client.URI(), backExp)

return err
}

func retryEnroll(err error, maxAttempts int, log *logger.Logger, enrollFn func() error, clientURI string, backExp backoff.Backoff) error {
attemptNo := 1

RETRYLOOP:
for {
attemptNo++
switch {
case errors.Is(err, fleetapi.ErrTooManyRequests):
log.Warn("Too many requests on the remote server, will retry in a moment.")
case errors.Is(err, fleetapi.ErrConnRefused):
log.Warn("Remote server is not ready to accept connections(Connection Refused), will retry in a moment.")
case errors.Is(err, fleetapi.ErrTemporaryServerError):
log.Warnf("Remote server failed to handle the request(%s), will retry in a moment.", err.Error())
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), err == nil:
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(err, fleetapi.ErrInvalidToken), err == nil, (maxAttempts != EnrollInfiniteAttempts && attemptNo > maxAttempts):
break RETRYLOOP
case err != nil:
log.Warnf("Error detected: %s, will retry in a moment.", err.Error())
}
if !backExp.Wait() {
break RETRYLOOP
}
log.Infof("Retrying enrollment to URL: %s", client.URI())
err = enroll(ctx, log, persistentConfig, client, options, configStore)
log.Infof("Retrying enrollment to URL: %s", clientURI)
err = enrollFn()
}

return err
Expand Down
85 changes: 85 additions & 0 deletions internal/pkg/agent/application/enroll/enroll_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package enroll

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent/pkg/core/logger"
)

// fakeBackoff allows controlling the sequence of Wait() return values for tests.
type fakeBackoff struct {
results []bool
}

func (f *fakeBackoff) Wait() bool {
return true
}

func (f *fakeBackoff) NextWait() (d time.Duration) { return 0 }
func (f *fakeBackoff) Reset() {}

func TestRetryEnroll_SucceedsAfterOneRetry(t *testing.T) {
// initial error forces at least one retry
initialErr := errors.New("initial failure")

called := 0
enrollFn := func() error {
called++
// succeed on first retry
return nil
}

fb := &fakeBackoff{results: []bool{true}}

l := logger.NewWithoutConfig("")

err := retryEnroll(initialErr, 5, l, enrollFn, "http://localhost", fb)
require.NoError(t, err)
require.Equal(t, 1, called)
}

func TestRetryEnroll_BackoffStopsImmediately(t *testing.T) {
initialErr := fmt.Errorf("network")
called := 0
expectedAttempts := 5
enrollFn := func() error {
called++
return errors.New("still failing")
}

fb := &fakeBackoff{results: []bool{false}}

l := logger.NewWithoutConfig("")

err := retryEnroll(initialErr, expectedAttempts, l, enrollFn, "http://localhost", fb)
require.Equal(t, expectedAttempts-1, called)
require.Error(t, err) // error is expected
require.NotErrorIs(t, err, initialErr) // subsequent failures are different
}

func TestRetryEnroll_BreaksOnContextCanceled(t *testing.T) {
// When err is context.Canceled, retryEnroll should return immediately
cancelErr := context.Canceled
called := 0
enrollFn := func() error {
called++
return nil
}
fb := &fakeBackoff{results: []bool{true}}

l := logger.NewWithoutConfig("")

err := retryEnroll(cancelErr, 5, l, enrollFn, "http://localhost", fb)
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, called, 0)
}
4 changes: 3 additions & 1 deletion internal/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ func (c *enrollCmd) Execute(ctx context.Context, streams *cli.IOStreams) error {
enrollDelay,
*c.options,
c.configStore,
c.backoffFactory)
c.backoffFactory,
-1, // try indefinitely, let user cancel the action
)
if err != nil {
return fmt.Errorf("fail to enroll: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/fleetapi/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func ExtractError(resp io.Reader) error {
if len(e.Message) == 0 {
return fmt.Errorf("status code: %d, fleet-server returned an error: %s", e.StatusCode, e.Error)
}

return fmt.Errorf(
"status code: %d, fleet-server returned an error: %s, message: %s",
e.StatusCode,
Expand Down
7 changes: 7 additions & 0 deletions internal/pkg/fleetapi/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ var ErrConnRefused = errors.New("connection refused")
// ErrTemporaryServerError is returned when the request caused a temporary server error
var ErrTemporaryServerError = errors.New("temporary server error, please retry later")

// ErrInvalidToken is returned when client is not authorized to perform enrollment.
var ErrInvalidToken error = errors.New("invalid enrollment token")

// temporaryServerErrorCodes defines status codes that allow clients to retry their request.
var temporaryServerErrorCodes = map[int]string{
http.StatusBadGateway: "BadGateway",
Expand Down Expand Up @@ -234,6 +237,10 @@ func (e *EnrollCmd) Execute(ctx context.Context, r *EnrollRequest) (*EnrollRespo
return nil, ErrTooManyRequests
}

if resp.StatusCode == http.StatusUnauthorized {
return nil, ErrInvalidToken
}

if status, temporary := temporaryServerErrorCodes[resp.StatusCode]; temporary {
return nil, fmt.Errorf("received status code %d (%s): %w", resp.StatusCode, status, ErrTemporaryServerError)
}
Expand Down