Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,9 @@ func (c *changeCache) RemoveSkipped(x uint64) error {
}

// Removes a set of sequences. Logs warning on removal error, returns count of successfully removed.
func (c *changeCache) RemoveSkippedSequences(ctx context.Context, sequences []uint64) {
// this can be used for removal or ranges, pending CBG-3855
func (c *changeCache) RemoveSkippedSequences(ctx context.Context, startSeq, endSeq uint64) error {
err := c.skippedSeqs.removeSeqRange(startSeq, endSeq)
return err
}

func (c *changeCache) WasSkipped(x uint64) bool {
Expand Down
50 changes: 50 additions & 0 deletions db/skipped_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,56 @@ func binarySearchFunc(a *SkippedSequenceListEntry, seq uint64) int {
return -1
}

// removeSeqRange will remove sequence between x and y from the skipped sequence slice
func (s *SkippedSequenceSlice) removeSeqRange(startSeq, endSeq uint64) error {
s.lock.Lock()
defer s.lock.Unlock()

startIndex, found := s._findSequence(startSeq)
if !found {
return fmt.Errorf("sequence %d not found in the skipped list", startSeq)
}
endIndex, found := s._findSequence(endSeq)
if !found {
return fmt.Errorf("sequence %d not found in the skipped list", startSeq)
}

// if both sequence x and y aren't in the same element, the range contains sequences that are not present in
// skipped sequence list. This is due to any contiguous sequences in the list will be in the same element. So
// if startSeq and endSeq are in different elements there is at least one sequence between these values not in the list
if startIndex != endIndex {
return fmt.Errorf("sequence range %d to %d specified has sequences in that are not present in skipped list", startSeq, endSeq)
}

// handle sequence range removal
rangeElem := s.list[startIndex]
// update number of sequences currently in slice stat
rangeRemoved := SkippedSequenceListEntry{start: startSeq, end: endSeq}
s.NumCurrentSkippedSequences -= rangeRemoved.getNumSequencesInEntry()

// if single sequence element
if rangeElem.getStartSeq() == startSeq && rangeElem.getLastSeq() == endSeq {
// simply remove the element
s.list = slices.Delete(s.list, startIndex, startIndex+1)
return nil
}
// check if one of x or y is equal to start or end seq
if rangeElem.getStartSeq() == startSeq {
rangeElem.setStartSeq(endSeq + 1)
return nil
}
if rangeElem.getLastSeq() == endSeq {
rangeElem.setLastSeq(startSeq - 1)
return nil
}
// assume we are removing from middle of the range
newElem := NewSkippedSequenceRangeEntryAt(endSeq+1, rangeElem.getLastSeq(), rangeElem.getTimestamp())
s._insert(startIndex+1, newElem)
rangeElem.setLastSeq(startSeq - 1)
return nil

}

// removeSeq will remove a given sequence from the slice if it exists
func (s *SkippedSequenceSlice) removeSeq(x uint64) error {
s.lock.Lock()
Expand Down
139 changes: 139 additions & 0 deletions db/skipped_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,13 @@ func BenchmarkRemoveSeqFromSkippedList(b *testing.B) {
}
}

func BenchmarkRemoveSeqRangeFromSkippedList(b *testing.B) {
skipedSlice := setupBenchmark(true, true, DefaultClipCapacityHeadroom)
for i := 0; i < b.N; i++ {
_ = skipedSlice.removeSeqRange(uint64(i*2), uint64(i*2)+5)
}
}

// TestInsertItemInSlice:
// - Create skipped sequence slice
// - Insert a new value in the slice at index specified to maintain order
Expand Down Expand Up @@ -703,6 +710,138 @@ func TestGetOldestSkippedSequence(t *testing.T) {
}
}

// TestRemoveSequenceRange:
// - Setup skipped list
// - Remove the range specified in the test case
// - Assert on error returned from removeSeqRange depending on whether testcase is a error case
// - Assert on expected resulting skipped list
// - Assert on number of skipped sequences in the list after removal
func TestRemoveSequenceRange(t *testing.T) {
testCases := []struct {
name string
inputList [][]uint64
expected [][]uint64
expectedNumSequencesInSlice int64
rangeToRemove []uint64
errorCase bool
}{
{
name: "x_startSeq_on_range",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {29, 30}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{25, 28},
expectedNumSequencesInSlice: 26,
},
{
name: "y_endSeq_on_range",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 26}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{27, 30},
expectedNumSequencesInSlice: 26,
},
{
name: "x_y_startSeq_endSeq_on_range",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {45, 50}},
rangeToRemove: []uint64{35, 40},
expectedNumSequencesInSlice: 24,
},
{
name: "x_y_in_middle_of_range",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 36}, {39, 40}, {45, 50}},
rangeToRemove: []uint64{37, 38},
expectedNumSequencesInSlice: 28,
},
{
name: "x+1_from_startSeq_y_-1_from_startSeq",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 15}, {20, 20}, {25, 30}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{16, 19},
expectedNumSequencesInSlice: 26,
},
{
name: "single_sequence_removed",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}, {55}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{55, 55},
expectedNumSequencesInSlice: 30,
},
{
name: "single_sequence_removed_from_range",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 26}, {28, 30}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{27, 27},
expectedNumSequencesInSlice: 29,
},
{
name: "x_y_across_multiple_elements",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{26, 37},
expectedNumSequencesInSlice: 30,
errorCase: true,
},
{
name: "x_does_not_exist_in_list",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{32, 35},
expectedNumSequencesInSlice: 30,
errorCase: true,
},
{
name: "y_does_not_exist_in_list",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{25, 31},
expectedNumSequencesInSlice: 30,
errorCase: true,
},
{
name: "x_and_y_do_not_exist_in_list",
inputList: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
expected: [][]uint64{{5, 10}, {15, 20}, {25, 30}, {35, 40}, {45, 50}},
rangeToRemove: []uint64{31, 33},
expectedNumSequencesInSlice: 30,
errorCase: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom)
for _, input := range testCase.inputList {
if len(input) == 1 {
skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(input[0]))
} else {
skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(input[0], input[1]))
}
}

err := skippedSlice.removeSeqRange(testCase.rangeToRemove[0], testCase.rangeToRemove[1])
if testCase.errorCase {
require.Error(t, err)
} else {
require.NoError(t, err)
}

for i := 0; i < len(skippedSlice.list); i++ {
if testCase.expected[i][0] == testCase.expected[i][1] {
assert.True(t, skippedSlice.list[i].singleEntry())
} else {
assert.False(t, skippedSlice.list[i].singleEntry())
}
// if we expect range at this index, assert on it
assert.Equal(t, testCase.expected[i][0], skippedSlice.list[i].getStartSeq())
assert.Equal(t, testCase.expected[i][1], skippedSlice.list[i].getLastSeq())
}
// assert on current count of skipped sequences
assert.Equal(t, testCase.expectedNumSequencesInSlice, skippedSlice.NumCurrentSkippedSequences)
})
}

}

// setupBenchmark sets up a skipped sequence slice for benchmark tests
func setupBenchmark(largeSlice bool, rangeEntries bool, clipHeadroom int) *SkippedSequenceSlice {
skippedSlice := NewSkippedSequenceSlice(clipHeadroom)
Expand Down