Skip to content

Commit 9659769

Browse files
committed
Fixing memory leak in KDD watchers
When we create a cluster without our CRDs or if we remove one from a running cluster it will start retrying to watch that non-existent resource. During this loop we "resync" and then destroy the old watchers. This process kicks off the leak which stems from somewhere in client-go, which could be from fragmenting memory by quickly creating and destroying the watches and underlying channels. We now only close out watchers that have needed resync, this prevents us from retrying watches on things that don't need to be stopped.
1 parent 2fe92ae commit 9659769

File tree

2 files changed

+45
-18
lines changed

2 files changed

+45
-18
lines changed

lib/backend/k8s/syncer.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,19 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
308308

309309
log.Info("Starting Kubernetes API read loop")
310310
for {
311+
needSync := false
312+
313+
// Find out if we need to resync.
314+
for _, resync := range syn.needsResync {
315+
// We found something that needs resync, we can stop and move on.
316+
if resync {
317+
needSync = true
318+
break
319+
}
320+
}
321+
311322
// If we need to resync, do so.
312-
if len(syn.needsResync) != 0 {
323+
if needSync {
313324
// Set status to ResyncInProgress.
314325
log.Debugf("Resync required - latest versions: %+v", latestVersions)
315326
syn.callbacks.OnStatusUpdated(api.ResyncInProgress)
@@ -336,9 +347,12 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
336347
return
337348
}
338349

339-
// Close the previous crop of watchers to avoid leaking resources when we
340-
// recreate them below.
341-
syn.closeAllWatchers()
350+
// Close out any watches that needed resync.
351+
for k, resync := range syn.needsResync {
352+
if _, exists := syn.openWatchers[k]; exists && resync {
353+
syn.closeWatcher(k)
354+
}
355+
}
342356
}
343357

344358
// Create the Kubernetes API watchers.
@@ -353,6 +367,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
353367
}
354368
syn.openWatchers[KEY_NS] = nsWatch
355369
nsChan = nsWatch.ResultChan()
370+
syn.needsResync[KEY_NS] = false
356371
}
357372

358373
if _, exists := syn.openWatchers[KEY_PO]; !exists {
@@ -366,6 +381,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
366381
}
367382
syn.openWatchers[KEY_PO] = poWatch
368383
poChan = poWatch.ResultChan()
384+
syn.needsResync[KEY_PO] = false
369385
}
370386

371387
if _, exists := syn.openWatchers[KEY_NP]; !exists {
@@ -380,6 +396,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
380396
}
381397
syn.openWatchers[KEY_NP] = npWatch
382398
npChan = npWatch.ResultChan()
399+
syn.needsResync[KEY_NP] = false
383400
}
384401

385402
if _, exists := syn.openWatchers[KEY_GNP]; !exists {
@@ -394,6 +411,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
394411
}
395412
syn.openWatchers[KEY_GNP] = gnpWatch
396413
gnpChan = gnpWatch.ResultChan()
414+
syn.needsResync[KEY_GNP] = false
397415
}
398416

399417
if _, exists := syn.openWatchers[KEY_GC]; !exists {
@@ -408,6 +426,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
408426
}
409427
syn.openWatchers[KEY_GC] = globalFelixConfigWatch
410428
gcChan = globalFelixConfigWatch.ResultChan()
429+
syn.needsResync[KEY_GC] = false
411430
}
412431

413432
if _, exists := syn.openWatchers[KEY_IP]; !exists {
@@ -422,6 +441,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
422441
}
423442
syn.openWatchers[KEY_IP] = ipPoolWatch
424443
poolChan = ipPoolWatch.ResultChan()
444+
syn.needsResync[KEY_IP] = false
425445
}
426446

427447
if _, exists := syn.openWatchers[KEY_NO]; !exists && !syn.disableNodePoll {
@@ -436,12 +456,10 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
436456
}
437457
syn.openWatchers[KEY_NO] = nodeWatch
438458
noChan = nodeWatch.ResultChan()
459+
syn.needsResync[KEY_NO] = false
460+
syn.needsResync[KEY_HC] = false
439461
}
440462

441-
// We resynced if we needed to, and have a complete set of watchers, so reset the
442-
// needsResync flag.
443-
syn.needsResync = map[string]bool{}
444-
445463
// Select on the various watch channels.
446464
select {
447465
case <-syn.stopChan:

lib/backend/k8s/syncer_test.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -323,19 +323,19 @@ var _ = Describe("Test Syncer", func() {
323323
// Simulate error on pod watch.
324324
tc.podC <- watch.Event{Type: watch.Error, Object: nil}
325325
// Expect a single new list call, but that each watcher is restarted.
326-
Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+7))
326+
Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+1))
327327
Expect(tc.getNumListCalls()).To(BeNumerically("==", LIST_CALLS+1))
328328

329329
// Simulate error on IP Pool watch.
330330
tc.poolC <- watch.Event{Type: watch.Error, Object: nil}
331331
// Expect a single new list call, but that each watcher is restarted.
332-
Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+14))
332+
Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+2))
333333
Expect(tc.getNumListCalls()).To(BeNumerically("==", LIST_CALLS+2))
334334

335335
// Simulate empty event on IP Pool watch (resourceVersion too old for TPRs)
336336
tc.poolC <- watch.Event{Object: nil}
337337
// Expect a single new list call, but that each watcher is restarted.
338-
Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+21))
338+
Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+3))
339339
Expect(tc.getNumListCalls()).To(BeNumerically("==", LIST_CALLS+3))
340340
})
341341

@@ -383,15 +383,24 @@ var _ = Describe("Test Syncer", func() {
383383
// Check that, after the resync, the old watchers are stopped.
384384
tc.stateMutex.Lock()
385385
defer tc.stateMutex.Unlock()
386-
// We expect 7 old watchers and 7 new. If that changes, we'll assert here
386+
// We expect 7 old watchers and 1 new. If that changes, we'll assert here
387387
// so the maintainer can re-check the test still matches the logic.
388-
Expect(tc.openWatchers).To(HaveLen(14))
389-
for _, w := range tc.openWatchers[:len(tc.openWatchers)/2] {
390-
w.stopMutex.Lock()
391-
stopped := w.stopped
392-
w.stopMutex.Unlock()
393-
Expect(stopped).To(BeTrue())
388+
Expect(tc.openWatchers).To(HaveLen(8))
389+
390+
// Check and verify the old pod watcher was closed, make sure we ignore the
391+
// newest pod watch that was added by only iterating over the old watchers.
392+
closed := false
393+
for _, w := range tc.openWatchers[:len(tc.openWatchers)-1] {
394+
if w.name == "pod" {
395+
w.stopMutex.Lock()
396+
stopped := w.stopped
397+
w.stopMutex.Unlock()
398+
Expect(stopped).To(BeTrue())
399+
closed = true
400+
}
394401
}
402+
// If for some reason we never found a pod watch we should fail.
403+
Expect(closed).To(BeTrue())
395404
})
396405
})
397406
})

0 commit comments

Comments
 (0)