Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/6906-tsaarni-small.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance in clusters with a large number of endpoints by using go-control-plane LinearCache for EDS.
21 changes: 3 additions & 18 deletions internal/xdscache/v3/endpointslicetranslator.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,25 +339,10 @@ func (e *EndpointSliceTranslator) OnChange(root *dag.DAG) {
// be removed. Since we reset the cluster cache above, all
// the load assignments will be recalculated and we can just
// set the entries rather than merging them.
entries := e.cache.Recalculate()
e.entries = e.cache.Recalculate()

// Only update and notify if entries has changed.
changed := false

e.mu.Lock()
if !equal(e.entries, entries) {
e.entries = entries
changed = true
}
e.mu.Unlock()

if changed {
e.Debug("cluster load assignments changed, notifying waiters")
if e.Observer != nil {
e.Observer.Refresh()
}
} else {
e.Debug("cluster load assignments did not change")
if e.Observer != nil {
e.Observer.Refresh()
}
}

Expand Down
46 changes: 35 additions & 11 deletions internal/xdscache/v3/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
type SnapshotHandler struct {
resources map[envoy_resource_v3.Type]xdscache.ResourceCache
defaultCache envoy_cache_v3.SnapshotCache
edsCache envoy_cache_v3.SnapshotCache
edsCache *envoy_cache_v3.LinearCache
mux *envoy_cache_v3.MuxCache
log logrus.FieldLogger
}
Expand All @@ -44,7 +44,12 @@
func NewSnapshotHandler(resources []xdscache.ResourceCache, log logrus.FieldLogger) *SnapshotHandler {
var (
defaultCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "defaultCache"))
edsCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "edsCache"))
// Envoy will open EDS stream per CDS entry.
// LinearCache mitigates the issue where all EDS streams are notified of any endpoint changes, even if unrelated to the requested resources.
// With LinearCache, updates are sent only for resources explicitly requested by Envoy.
edsCache = envoy_cache_v3.NewLinearCache(envoy_resource_v3.EndpointType,
envoy_cache_v3.WithVersionPrefix(uuid.NewString()+"-"),
envoy_cache_v3.WithLogger(log.WithField("context", "edsCache")))

mux = &envoy_cache_v3.MuxCache{
Caches: map[string]envoy_cache_v3.Cache{},
Expand Down Expand Up @@ -89,21 +94,40 @@
// Refresh is called when the EndpointSliceTranslator updates values
// in its cache. It updates the EDS cache.
func (s *SnapshotHandler) Refresh() {
version := uuid.NewString()
previouslyNotifiedResources := s.edsCache.GetResources()
currentResources := envoy_cache_v3.IndexRawResourcesByName(asResources(s.resources[envoy_resource_v3.EndpointType].Contents()))

toUpdate := make(map[string]envoy_types.Resource, len(currentResources))
toDelete := make([]string, 0, len(previouslyNotifiedResources))

for resourceName, previousResource := range previouslyNotifiedResources {
if newResource, ok := currentResources[resourceName]; ok {
// Add resources that were updated.
if !proto.Equal(newResource, previousResource) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're effectively trading the req/resp expense for this comparison expense to ensure we don't send unnecessary updates

toUpdate[resourceName] = newResource
}
} else {
// Add resources that were deleted.
toDelete = append(toDelete, resourceName)
}
}

resources := map[envoy_resource_v3.Type][]envoy_types.Resource{
envoy_resource_v3.EndpointType: asResources(s.resources[envoy_resource_v3.EndpointType].Contents()),
// Add resources that are new.
for resourceName, newResource := range currentResources {
if _, ok := previouslyNotifiedResources[resourceName]; !ok {
toUpdate[resourceName] = newResource
}
}

snapshot, err := envoy_cache_v3.NewSnapshot(version, resources)
if err != nil {
s.log.Errorf("failed to generate snapshot version %q: %s", version, err)
if len(toUpdate) == 0 && len(toDelete) == 0 {
s.log.Debug("no EDS resources to update")
return
}

if err := s.edsCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
s.log.Errorf("failed to store snapshot version %q: %s", version, err)
return
s.log.WithField("toUpdate", len(toUpdate)).WithField("toDelete", len(toDelete)).Debug("refreshing EDS cache")

if err := s.edsCache.UpdateResources(toUpdate, toDelete); err != nil {
s.log.WithError(err).Error("failed to update EDS cache")

Check warning on line 130 in internal/xdscache/v3/snapshot.go

View check run for this annotation

Codecov / codecov/patch

internal/xdscache/v3/snapshot.go#L130

Added line #L130 was not covered by tests
}
}

Expand Down