Skip to content

Commit a79f17d

Browse files
committed
Fix race condition in updateStats()
1 parent cf36b3a commit a79f17d

File tree

3 files changed

+43
-16
lines changed

3 files changed

+43
-16
lines changed

db/change_cache.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ func (c *changeCache) updateStats(ctx context.Context) {
8989
c.db.DbStats.Cache().PendingSeqLen.Set(int64(c.internalStats.pendingSeqLen))
9090
c.db.DbStats.CBLReplicationPull().MaxPending.SetIfMax(int64(c.internalStats.maxPending))
9191
c.db.DbStats.Cache().HighSeqStable.Set(int64(c._getMaxStableCached(ctx)))
92-
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(len(c.skippedSeqs.list)))
93-
c.db.DbStats.Cache().SkippedSeqCap.Set(int64(cap(c.skippedSeqs.list)))
94-
c.db.DbStats.Cache().NumSkippedSeqs.Set(c.skippedSeqs.NumCumulativeSkippedSequences)
95-
c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(c.skippedSeqs.NumCurrentSkippedSequences)
92+
c.db.DbStats.Cache().SkippedSeqLen.Set(c.skippedSeqs.getSliceLength())
93+
c.db.DbStats.Cache().SkippedSeqCap.Set(c.skippedSeqs.getSliceCapacity())
94+
c.db.DbStats.Cache().NumSkippedSeqs.Set(c.skippedSeqs.getCumulativeNumSkippedSequenceValue())
95+
c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(c.skippedSeqs.getNumCurrentSkippedSequenceValue())
9696

9797
}
9898

@@ -314,7 +314,6 @@ func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error {
314314
}
315315

316316
c.db.DbStats.Cache().AbandonedSeqs.Add(compactedSequences)
317-
c.skippedSeqs.NumCurrentSkippedSequences -= compactedSequences
318317

319318
base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. Cleaned %d sequences from skipped list for database %s.", compactedSequences, base.MD(c.db.Name))
320319
return nil
@@ -878,9 +877,6 @@ func (h *LogPriorityQueue) Pop() interface{} {
878877

879878
func (c *changeCache) RemoveSkipped(x uint64) error {
880879
err := c.skippedSeqs.removeSeq(x)
881-
if err == nil {
882-
c.skippedSeqs.NumCurrentSkippedSequences -= 1
883-
}
884880
return err
885881
}
886882

@@ -894,11 +890,7 @@ func (c *changeCache) WasSkipped(x uint64) bool {
894890
}
895891

896892
func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq uint64) {
897-
newEntry := NewSkippedSequenceRangeEntry(startSeq, endSeq)
898-
numSequences := newEntry.getNumSequencesInEntry()
899-
c.skippedSeqs.NumCurrentSkippedSequences += numSequences
900-
c.skippedSeqs.NumCumulativeSkippedSequences += numSequences
901-
c.skippedSeqs.PushSkippedSequenceEntry(newEntry)
893+
c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq))
902894
}
903895

904896
// waitForSequence blocks up to maxWaitTime until the given sequence has been received.

db/change_cache_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,9 +2334,6 @@ func TestSkippedSequenceCompact(t *testing.T) {
23342334
}
23352335
_ = testChangeCache.processEntry(ctx, highEntry)
23362336

2337-
// update cache stats for assertions
2338-
testChangeCache.updateStats(ctx)
2339-
23402337
// assert this pushes an entry on the skipped sequence slice
23412338
require.EventuallyWithT(t, func(c *assert.CollectT) {
23422339
assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list))

db/skipped_sequence.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ func (s *SkippedSequenceSlice) SkippedSequenceCompact(ctx context.Context, maxWa
165165
}
166166
// resize slice to reclaim memory if we need to
167167
s._clip(ctx)
168+
// decrement number of current skipped sequences by the number of sequences compacted
169+
s.NumCurrentSkippedSequences -= numSequencesCompacted
170+
168171
return numSequencesCompacted
169172
}
170173

@@ -209,6 +212,8 @@ func (s *SkippedSequenceSlice) removeSeq(x uint64) error {
209212
if !found {
210213
return fmt.Errorf("sequence %d not found in the skipped list", x)
211214
}
215+
// if found we need to decrement the current num skipped sequences stat
216+
s.NumCurrentSkippedSequences -= 1
212217

213218
// take the element at the index and handle cases required to removal of a sequence
214219
rangeElem := s.list[index]
@@ -264,6 +269,11 @@ func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceLi
264269
s.lock.Lock()
265270
defer s.lock.Unlock()
266271

272+
// update num current skipped sequences count + the cumulative count of skipped sequences
273+
numSequencesIncoming := entry.getNumSequencesInEntry()
274+
s.NumCurrentSkippedSequences += numSequencesIncoming
275+
s.NumCumulativeSkippedSequences += numSequencesIncoming
276+
267277
if len(s.list) == 0 {
268278
s.list = append(s.list, entry)
269279
return
@@ -291,3 +301,31 @@ func (s *SkippedSequenceSlice) getOldest() uint64 {
291301
// grab fist element in slice and take the start seq of that range/single sequence
292302
return s.list[0].getStartSeq()
293303
}
304+
305+
// getSliceLength retrieves the current skipped sequence slice length
306+
func (s *SkippedSequenceSlice) getSliceLength() int64 {
307+
s.lock.RLock()
308+
defer s.lock.RUnlock()
309+
return int64(len(s.list))
310+
}
311+
312+
// getSliceCapacity retrieves the current skipped sequence slice capacity
313+
func (s *SkippedSequenceSlice) getSliceCapacity() int64 {
314+
s.lock.RLock()
315+
defer s.lock.RUnlock()
316+
return int64(cap(s.list))
317+
}
318+
319+
// getNumCurrentSkippedSequenceValue retrieves the current skipped sequence count
320+
func (s *SkippedSequenceSlice) getNumCurrentSkippedSequenceValue() int64 {
321+
s.lock.RLock()
322+
defer s.lock.RUnlock()
323+
return s.NumCurrentSkippedSequences
324+
}
325+
326+
// getCumulativeNumSkippedSequenceValue retrieves the cumulative skipped sequence count
327+
func (s *SkippedSequenceSlice) getCumulativeNumSkippedSequenceValue() int64 {
328+
s.lock.RLock()
329+
defer s.lock.RUnlock()
330+
return s.NumCumulativeSkippedSequences
331+
}

0 commit comments

Comments
 (0)