Skip to content

Commit afdb499

Browse files
azdagronstevend-uber
authored andcommitted
Fix regression notifying workloads when entry removed (spiffe#3923)
PR spiffe#2305 fixed spurious notifications of workloads when nothing had changed but unfortunately introduced a regression wherein a workload is no longer notified by the cache when an entry for that workload is removed. The bug is caused by false sharing of the selRem temporary selector set. Previously selRem was used to build the selectors for entries being removed and the the contents of selRem were merged into the single notification. When multiple notification sets were introduced, selRem was added as a notification set. Unfortunately, selRem is cleared while processing entries, causing the notification set to be empty. Existing unit-tests did not catch this because the update that removes the existing entry did not have additional entries to be processed (that would cause selRem to be cleared). This PR fixes the bug by allocating a new selector set to be appended to the notification set instead of using selRem. It also cleans up some selector set usage and adds some additional logic to the unit-test so this condition can be caught in the future. Fixes: spiffe#3922 Signed-off-by: Andrew Harding <[email protected]>
1 parent 1ba29b2 commit afdb499

File tree

4 files changed

+60
-28
lines changed

4 files changed

+60
-28
lines changed

pkg/agent/manager/cache/cache.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,10 @@ func (c *Cache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.Regi
271271
// built a set of selectors for the record being removed, drop the
272272
// record for each selector index, and add the entry selectors to
273273
// the notify set.
274-
clearSelectorSet(selRem)
275-
selRem.Merge(record.entry.Selectors...)
276-
c.delSelectorIndicesRecord(selRem, record)
277-
notifySets = append(notifySets, selRem)
274+
notifySet, notifySetDone := allocSelectorSet(record.entry.Selectors...)
275+
defer notifySetDone()
276+
c.delSelectorIndicesRecord(notifySet, record)
277+
notifySets = append(notifySets, notifySet)
278278
delete(c.records, id)
279279
// Remove stale entry since, registration entry is no longer on cache.
280280
delete(c.staleEntries, id)
@@ -324,17 +324,15 @@ func (c *Cache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.Regi
324324
// are notified.
325325
if selectorsChanged {
326326
if existingEntry != nil {
327-
notifySet, selSetDone := allocSelectorSet()
328-
defer selSetDone()
329-
notifySet.Merge(existingEntry.Selectors...)
327+
notifySet, notifySetDone := allocSelectorSet(existingEntry.Selectors...)
328+
defer notifySetDone()
330329
notifySets = append(notifySets, notifySet)
331330
}
332331
}
333332

334333
if federatedBundlesChanged || selectorsChanged {
335-
notifySet, selSetDone := allocSelectorSet()
336-
defer selSetDone()
337-
notifySet.Merge(newEntry.Selectors...)
334+
notifySet, notifySetDone := allocSelectorSet(newEntry.Selectors...)
335+
defer notifySetDone()
338336
notifySets = append(notifySets, notifySet)
339337
}
340338

@@ -385,8 +383,8 @@ func (c *Cache) UpdateSVIDs(update *UpdateSVIDs) {
385383
defer c.mu.Unlock()
386384

387385
// Allocate a set of selectors that
388-
notifySet, selSetDone := allocSelectorSet()
389-
defer selSetDone()
386+
notifySet, notifySetDone := allocSelectorSet()
387+
defer notifySetDone()
390388

391389
// Add/update records for registration entries in the update
392390
for entryID, svid := range update.X509SVIDs {

pkg/agent/manager/cache/cache_test.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ func TestSubcriberNotifiedWhenEntryDropped(t *testing.T) {
400400
assertAnyWorkloadUpdate(t, subB)
401401

402402
foo := makeRegistrationEntry("FOO", "A")
403+
bar := makeRegistrationEntry("BAR", "B")
404+
403405
updateEntries := &UpdateEntries{
404406
Bundles: makeBundles(bundleV1),
405407
RegistrationEntries: makeRegistrationEntries(foo),
@@ -408,19 +410,35 @@ func TestSubcriberNotifiedWhenEntryDropped(t *testing.T) {
408410
cache.UpdateSVIDs(&UpdateSVIDs{
409411
X509SVIDs: makeX509SVIDs(foo),
410412
})
413+
411414
// make sure subA gets notified with FOO but not subB
412415
assertWorkloadUpdateEqual(t, subA, &WorkloadUpdate{
413416
Bundle: bundleV1,
414417
Identities: []Identity{{Entry: foo}},
415418
})
416419
assertNoWorkloadUpdate(t, subB)
417420

418-
updateEntries.RegistrationEntries = nil
421+
// Swap out FOO for BAR
422+
updateEntries.RegistrationEntries = makeRegistrationEntries(bar)
419423
cache.UpdateEntries(updateEntries, nil)
424+
cache.UpdateSVIDs(&UpdateSVIDs{
425+
X509SVIDs: makeX509SVIDs(bar),
426+
})
420427
assertWorkloadUpdateEqual(t, subA, &WorkloadUpdate{
421428
Bundle: bundleV1,
422429
})
423-
assertNoWorkloadUpdate(t, subB)
430+
assertWorkloadUpdateEqual(t, subB, &WorkloadUpdate{
431+
Bundle: bundleV1,
432+
Identities: []Identity{{Entry: bar}},
433+
})
434+
435+
// Drop both
436+
updateEntries.RegistrationEntries = nil
437+
cache.UpdateEntries(updateEntries, nil)
438+
assertNoWorkloadUpdate(t, subA)
439+
assertWorkloadUpdateEqual(t, subB, &WorkloadUpdate{
440+
Bundle: bundleV1,
441+
})
424442

425443
// Make sure trying to update SVIDs of removed entry does not notify
426444
cache.UpdateSVIDs(&UpdateSVIDs{

pkg/agent/manager/cache/lru_cache.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,10 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
273273
// built a set of selectors for the record being removed, drop the
274274
// record for each selector index, and add the entry selectors to
275275
// the notify set.
276-
clearSelectorSet(selRem)
277-
selRem.Merge(record.entry.Selectors...)
278-
c.delSelectorIndicesRecord(selRem, record)
279-
notifySets = append(notifySets, selRem)
276+
notifySet, notifySetDone := allocSelectorSet(record.entry.Selectors...)
277+
defer notifySetDone()
278+
c.delSelectorIndicesRecord(notifySet, record)
279+
notifySets = append(notifySets, notifySet)
280280
delete(c.records, id)
281281
delete(c.svids, id)
282282
// Remove stale entry since, registration entry is no longer on cache.
@@ -329,17 +329,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
329329
// are notified.
330330
if selectorsChanged {
331331
if existingEntry != nil {
332-
notifySet, selSetDone := allocSelectorSet()
333-
defer selSetDone()
334-
notifySet.Merge(existingEntry.Selectors...)
332+
notifySet, notifySetDone := allocSelectorSet(existingEntry.Selectors...)
333+
defer notifySetDone()
335334
notifySets = append(notifySets, notifySet)
336335
}
337336
}
338337

339338
if federatedBundlesChanged || selectorsChanged {
340-
notifySet, selSetDone := allocSelectorSet()
341-
defer selSetDone()
342-
notifySet.Merge(newEntry.Selectors...)
339+
notifySet, notifySetDone := allocSelectorSet(newEntry.Selectors...)
340+
defer notifySetDone()
343341
notifySets = append(notifySets, notifySet)
344342
}
345343

@@ -427,8 +425,8 @@ func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs) {
427425
defer c.mu.Unlock()
428426

429427
// Allocate a set of selectors that
430-
notifySet, selSetDone := allocSelectorSet()
431-
defer selSetDone()
428+
notifySet, notifySetDone := allocSelectorSet()
429+
defer notifySetDone()
432430

433431
// Add/update records for registration entries in the update
434432
for entryID, svid := range update.X509SVIDs {

pkg/agent/manager/cache/lru_cache_test.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,8 @@ func TestLRUCacheSubscriberNotifiedWhenEntryDropped(t *testing.T) {
387387
assertAnyWorkloadUpdate(t, subB)
388388

389389
foo := makeRegistrationEntry("FOO", "A")
390+
bar := makeRegistrationEntry("BAR", "B")
391+
390392
updateEntries := &UpdateEntries{
391393
Bundles: makeBundles(bundleV1),
392394
RegistrationEntries: makeRegistrationEntries(foo),
@@ -395,19 +397,35 @@ func TestLRUCacheSubscriberNotifiedWhenEntryDropped(t *testing.T) {
395397
cache.UpdateSVIDs(&UpdateSVIDs{
396398
X509SVIDs: makeX509SVIDs(foo),
397399
})
400+
398401
// make sure subA gets notified with FOO but not subB
399402
assertWorkloadUpdateEqual(t, subA, &WorkloadUpdate{
400403
Bundle: bundleV1,
401404
Identities: []Identity{{Entry: foo}},
402405
})
403406
assertNoWorkloadUpdate(t, subB)
404407

405-
updateEntries.RegistrationEntries = nil
408+
// Swap out FOO for BAR
409+
updateEntries.RegistrationEntries = makeRegistrationEntries(bar)
406410
cache.UpdateEntries(updateEntries, nil)
411+
cache.UpdateSVIDs(&UpdateSVIDs{
412+
X509SVIDs: makeX509SVIDs(bar),
413+
})
407414
assertWorkloadUpdateEqual(t, subA, &WorkloadUpdate{
408415
Bundle: bundleV1,
409416
})
410-
assertNoWorkloadUpdate(t, subB)
417+
assertWorkloadUpdateEqual(t, subB, &WorkloadUpdate{
418+
Bundle: bundleV1,
419+
Identities: []Identity{{Entry: bar}},
420+
})
421+
422+
// Drop both
423+
updateEntries.RegistrationEntries = nil
424+
cache.UpdateEntries(updateEntries, nil)
425+
assertNoWorkloadUpdate(t, subA)
426+
assertWorkloadUpdateEqual(t, subB, &WorkloadUpdate{
427+
Bundle: bundleV1,
428+
})
411429

412430
// Make sure trying to update SVIDs of removed entry does not notify
413431
cache.UpdateSVIDs(&UpdateSVIDs{

0 commit comments

Comments
 (0)