Skip to content

Commit 31cb61f

Browse files
devanbenzphiljb
andauthored
fix(tsm1): fix condition check for optimization of array cursor (#26691) (#26861)
Co-authored-by: Phil Bracikowski <[email protected]> fixes #26690
1 parent 269f1b6 commit 31cb61f

File tree

3 files changed

+172
-6
lines changed

3 files changed

+172
-6
lines changed

tsdb/engine/tsm1/array_cursor.gen.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (c *floatArrayAscendingCursor) Next() *tsdb.FloatArray {
110110

111111
if pos < len(c.res.Timestamps) {
112112
if c.tsm.pos < len(tvals.Timestamps) {
113-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
113+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
114114
// optimization: all points can be served from TSM data because
115115
// we need the entire block and the block completely fits within
116116
// the buffer.
@@ -402,7 +402,7 @@ func (c *integerArrayAscendingCursor) Next() *tsdb.IntegerArray {
402402

403403
if pos < len(c.res.Timestamps) {
404404
if c.tsm.pos < len(tvals.Timestamps) {
405-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
405+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
406406
// optimization: all points can be served from TSM data because
407407
// we need the entire block and the block completely fits within
408408
// the buffer.
@@ -694,7 +694,7 @@ func (c *unsignedArrayAscendingCursor) Next() *tsdb.UnsignedArray {
694694

695695
if pos < len(c.res.Timestamps) {
696696
if c.tsm.pos < len(tvals.Timestamps) {
697-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
697+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
698698
// optimization: all points can be served from TSM data because
699699
// we need the entire block and the block completely fits within
700700
// the buffer.
@@ -986,7 +986,7 @@ func (c *stringArrayAscendingCursor) Next() *tsdb.StringArray {
986986

987987
if pos < len(c.res.Timestamps) {
988988
if c.tsm.pos < len(tvals.Timestamps) {
989-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
989+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
990990
// optimization: all points can be served from TSM data because
991991
// we need the entire block and the block completely fits within
992992
// the buffer.
@@ -1278,7 +1278,7 @@ func (c *booleanArrayAscendingCursor) Next() *tsdb.BooleanArray {
12781278

12791279
if pos < len(c.res.Timestamps) {
12801280
if c.tsm.pos < len(tvals.Timestamps) {
1281-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
1281+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
12821282
// optimization: all points can be served from TSM data because
12831283
// we need the entire block and the block completely fits within
12841284
// the buffer.

tsdb/engine/tsm1/array_cursor.gen.go.tmpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (c *{{$type}}) Next() {{$arrayType}} {
109109

110110
if pos < len(c.res.Timestamps) {
111111
if c.tsm.pos < len(tvals.Timestamps) {
112-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps){
112+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps){
113113
// optimization: all points can be served from TSM data because
114114
// we need the entire block and the block completely fits within
115115
// the buffer.
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package tsm1
2+
3+
import (
4+
"context"
5+
"os"
6+
"testing"
7+
8+
"github.com/influxdata/influxdb/tsdb"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
// TestAscendingCursorDuplicateDataBug demonstrates the real bug that existed
14+
// in the ascending cursor optimization. The bug is encountered when cache data is merged with tsm data, specifically
15+
// in an earlier call to Next(), the cache must fill part but not all of the response buffer, which is 1000 points. TSM
16+
// data then fills the rest. The cache must not be exhausted. The following Next() sees that only TSM data is needed,
17+
// and incorrectly copies TSM data including the points from the previous Next() call. Additionally, cache data
18+
// needs to be older than most of the TSM data. There needs to be enough tsm data younger than the cache data to completely
19+
// fill the remaining space in the response buffer.
20+
func TestAscendingCursorDuplicateDataBug(t *testing.T) {
21+
dir := MustTempDir()
22+
defer os.RemoveAll(dir)
23+
fs := NewFileStore(dir)
24+
assert := assert.New(t)
25+
require := require.New(t)
26+
27+
tsmMinTime := 10
28+
tsmMaxTime := 70000
29+
// Create TSM data
30+
tsmData := []keyValues{
31+
{"measurement,field=value#!~#value", []Value{
32+
NewFloatValue(int64(tsmMinTime), 1.1), // tsm points can be older than the cache points
33+
// these next 5 points are the critical tsm points. They will be used after the cache is exhausted
34+
// as they are younger than all the cache points.
35+
NewFloatValue(10000, 10.1),
36+
NewFloatValue(20000, 20.2),
37+
NewFloatValue(30000, 30.3),
38+
NewFloatValue(40000, 40.4),
39+
NewFloatValue(50000, 50.5),
40+
// there can be more tsm values - additional values here don't impact the result
41+
NewFloatValue(60000, 60.5),
42+
NewFloatValue(int64(tsmMaxTime), 70.5),
43+
}},
44+
}
45+
46+
files, err := newFiles(dir, tsmData...)
47+
require.NoError(err, "error creating files")
48+
49+
err = fs.Replace(nil, files)
50+
require.NoError(err, "error updating filestore")
51+
52+
// Create cache data that triggers the bug scenario:
53+
// Cache fills most of the buffer, TSM provides remaining data
54+
var cacheValues Values
55+
bufferSize := tsdb.DefaultMaxPointsPerBlock
56+
57+
// Cache: 100 to 100+bufferSize-5 (leaving room for some TSM data)
58+
// The cache data timestamps are older than the TSM data timestamps
59+
for i := 0; i < bufferSize-5; i++ {
60+
cacheValues = append(cacheValues, NewFloatValue(int64(100+i), float64(100+i)))
61+
}
62+
63+
// Verify precondition: TSM and cache timestamps must not overlap
64+
tsmTimestamps := make(map[int64]bool)
65+
for _, v := range tsmData[0].values {
66+
tsmTimestamps[v.UnixNano()] = true
67+
}
68+
69+
for _, cv := range cacheValues {
70+
require.Falsef(tsmTimestamps[cv.UnixNano()], "Test precondition failed: TSM and cache data share timestamp %d. This will interfere with test results.", cv.UnixNano())
71+
}
72+
// Verify the cache fills most but not all of the buffer
73+
require.Lessf(len(cacheValues), bufferSize, "Test precondition failed: Cache must not fill entire buffer (size %d), has %d values", bufferSize, len(cacheValues))
74+
75+
// Verify cache has sufficient data to trigger the bug
76+
minCacheSize := bufferSize - 10 // Leave room for at least some TSM data
77+
require.GreaterOrEqualf(len(cacheValues), minCacheSize, "Test precondition failed: Cache must have at least %d values to trigger bug, has %d", minCacheSize, len(cacheValues))
78+
79+
// Verify TSM has data both before and after cache range
80+
var tsmBeforeCache, tsmAfterCache int
81+
cacheMin, cacheMax := cacheValues[0].UnixNano(), cacheValues[len(cacheValues)-1].UnixNano()
82+
for _, v := range tsmData[0].values {
83+
if v.UnixNano() < cacheMin {
84+
tsmBeforeCache++
85+
} else if v.UnixNano() > cacheMax {
86+
tsmAfterCache++
87+
}
88+
}
89+
require.GreaterOrEqualf(tsmAfterCache, 2, "Test precondition failed: Need at least 2 TSM points after cache range to trigger bug, has %d", tsmAfterCache)
90+
91+
kc := fs.KeyCursor(context.Background(), []byte("measurement,field=value#!~#value"), 0, true)
92+
defer kc.Close()
93+
94+
cursor := newFloatArrayAscendingCursor()
95+
defer cursor.Close()
96+
97+
// Find min and max timestamps across both TSM and cache data
98+
minTime := int64(tsmMinTime) // Start with first TSM timestamp
99+
maxTime := int64(tsmMaxTime) // Start with last TSM timestamp
100+
101+
// Check if cache has earlier timestamps
102+
if len(cacheValues) > 0 && cacheValues[0].UnixNano() < minTime {
103+
minTime = cacheValues[0].UnixNano()
104+
}
105+
106+
// Check if cache has later timestamps
107+
if len(cacheValues) > 0 && cacheValues[len(cacheValues)-1].UnixNano() > maxTime {
108+
maxTime = cacheValues[len(cacheValues)-1].UnixNano()
109+
}
110+
111+
// Add buffer to ensure we capture all data
112+
maxTime += 1000
113+
114+
// search over the whole time range, ascending
115+
err = cursor.reset(minTime, maxTime, cacheValues, kc)
116+
require.NoError(err, "error resetting cursor")
117+
118+
// Collect all timestamps and values from all Next() calls
119+
var allTimestamps []int64
120+
var allValues []float64
121+
seenTimestamps := make(map[int64]bool)
122+
callNum := 0
123+
124+
for {
125+
result := cursor.Next() // run to exhaustion of the cursor
126+
if len(result.Timestamps) == 0 {
127+
break
128+
}
129+
callNum++
130+
131+
// Verify timestamp/value counts match
132+
assert.Equalf(len(result.Timestamps), len(result.Values), "Call %d: Timestamp/value count mismatch: %d timestamps, %d values",
133+
callNum, len(result.Timestamps), len(result.Values))
134+
135+
for i, ts := range result.Timestamps {
136+
if seenTimestamps[ts] {
137+
assert.Failf("DUPLICATE DATA BUG", "at call %d, position %d: Timestamp %d returned multiple times",
138+
callNum, i, ts)
139+
// Print first few timestamps from this batch for debugging
140+
if len(result.Timestamps) > 0 {
141+
endIdx := 10
142+
if endIdx > len(result.Timestamps) {
143+
endIdx = len(result.Timestamps)
144+
}
145+
t.Logf("First %d timestamps from call %d: %v", endIdx, callNum, result.Timestamps[:endIdx])
146+
}
147+
}
148+
seenTimestamps[ts] = true
149+
allTimestamps = append(allTimestamps, ts)
150+
}
151+
152+
// Collect values for verification
153+
allValues = append(allValues, result.Values...)
154+
}
155+
156+
// Verify no duplicates were found
157+
assert.Equalf(len(seenTimestamps), len(allTimestamps), "Found duplicate timestamps! Total: %d, Unique: %d", len(allTimestamps), len(seenTimestamps))
158+
159+
// Verify we got all expected data (cache + TSM)
160+
assert.Equalf(len(cacheValues)+len(tsmData[0].values), len(allTimestamps), "total timestamps")
161+
162+
// Verify data is in ascending order
163+
for i := 1; i < len(allTimestamps); i++ {
164+
assert.Greaterf(allTimestamps[i], allTimestamps[i-1], "Timestamps not in ascending order: %d followed by %d", allTimestamps[i-1], allTimestamps[i])
165+
}
166+
}

0 commit comments

Comments
 (0)