Skip to content

Commit 617d94f

Browse files
philjbdevanbenz
authored andcommitted
fix(tsm1): fix condition check for optimization of array cursor (#26691)
* fix(tsm1): fix condition check for optimization of array cursor The array ascending cursors do not check all the required conditions before doing an optimization to copy a full 1000 points of data from the tsm read buffer to the cursors response buffer. This can result in points from tsm files appearing in cursor output multiple times. The conditions the bug appears are rare as it requires cache data that is older (or overlapping) with tsm data. The data must be in the same shard and must match for the same key. * fixes #26690 * chore(tsm1): add test that covers array cursor bug This commit adds a test that verifies the duplicate data bug fix. It will fail without the previous commit. See the test for details; in short it sets up the right cache and tsm data to expose the bug which was occuring when the cache had older data than the tsm files and then the cache was exhausted while there were still more tsm values to include in more Next() calls. * chore(test): switch to using the testify library in the new test The assert package is used for post conditions and the require package is used for pre-conditions in the test. The library provides clearer intent of conditions and better formatting of information (message and data) in the situation an assertion fails. It also makes it easier to read by reducing ifs but some lines are longer. This commit addresses PR requests. (cherry picked from commit 9d1e79d)
1 parent e51acc4 commit 617d94f

File tree

3 files changed

+172
-6
lines changed

3 files changed

+172
-6
lines changed

tsdb/engine/tsm1/array_cursor.gen.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (c *floatArrayAscendingCursor) Next() *tsdb.FloatArray {
110110

111111
if pos < len(c.res.Timestamps) {
112112
if c.tsm.pos < len(tvals.Timestamps) {
113-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
113+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
114114
// optimization: all points can be served from TSM data because
115115
// we need the entire block and the block completely fits within
116116
// the buffer.
@@ -402,7 +402,7 @@ func (c *integerArrayAscendingCursor) Next() *tsdb.IntegerArray {
402402

403403
if pos < len(c.res.Timestamps) {
404404
if c.tsm.pos < len(tvals.Timestamps) {
405-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
405+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
406406
// optimization: all points can be served from TSM data because
407407
// we need the entire block and the block completely fits within
408408
// the buffer.
@@ -694,7 +694,7 @@ func (c *unsignedArrayAscendingCursor) Next() *tsdb.UnsignedArray {
694694

695695
if pos < len(c.res.Timestamps) {
696696
if c.tsm.pos < len(tvals.Timestamps) {
697-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
697+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
698698
// optimization: all points can be served from TSM data because
699699
// we need the entire block and the block completely fits within
700700
// the buffer.
@@ -986,7 +986,7 @@ func (c *stringArrayAscendingCursor) Next() *tsdb.StringArray {
986986

987987
if pos < len(c.res.Timestamps) {
988988
if c.tsm.pos < len(tvals.Timestamps) {
989-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
989+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
990990
// optimization: all points can be served from TSM data because
991991
// we need the entire block and the block completely fits within
992992
// the buffer.
@@ -1278,7 +1278,7 @@ func (c *booleanArrayAscendingCursor) Next() *tsdb.BooleanArray {
12781278

12791279
if pos < len(c.res.Timestamps) {
12801280
if c.tsm.pos < len(tvals.Timestamps) {
1281-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
1281+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps) {
12821282
// optimization: all points can be served from TSM data because
12831283
// we need the entire block and the block completely fits within
12841284
// the buffer.

tsdb/engine/tsm1/array_cursor.gen.go.tmpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (c *{{$type}}) Next() {{$arrayType}} {
109109

110110
if pos < len(c.res.Timestamps) {
111111
if c.tsm.pos < len(tvals.Timestamps) {
112-
if pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps){
112+
if pos == 0 && c.tsm.pos == 0 && len(c.res.Timestamps) >= len(tvals.Timestamps){
113113
// optimization: all points can be served from TSM data because
114114
// we need the entire block and the block completely fits within
115115
// the buffer.
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package tsm1
2+
3+
import (
4+
"context"
5+
"os"
6+
"testing"
7+
8+
"github.com/influxdata/influxdb/tsdb"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
// TestAscendingCursorDuplicateDataBug demonstrates the real bug that existed
14+
// in the ascending cursor optimization. The bug is encountered when cache data is merged with tsm data, specifically
15+
// in an earlier call to Next(), the cache must fill part but not all of the response buffer, which is 1000 points. TSM
16+
// data then fills the rest. The cache must not be exhausted. The following Next() sees that only TSM data is needed,
17+
// and incorrectly copies TSM data including the points from the previous Next() call. Additionally, cache data
18+
// needs to be older than most of the TSM data. There needs to be enough tsm data younger than the cache data to completely
19+
// fill the remaining space in the response buffer.
20+
func TestAscendingCursorDuplicateDataBug(t *testing.T) {
21+
dir := MustTempDir()
22+
defer os.RemoveAll(dir)
23+
fs := NewFileStore(dir)
24+
assert := assert.New(t)
25+
require := require.New(t)
26+
27+
tsmMinTime := 10
28+
tsmMaxTime := 70000
29+
// Create TSM data
30+
tsmData := []keyValues{
31+
{"measurement,field=value#!~#value", []Value{
32+
NewFloatValue(int64(tsmMinTime), 1.1), // tsm points can be older than the cache points
33+
// these next 5 points are the critical tsm points. They will be used after the cache is exhausted
34+
// as they are younger than all the cache points.
35+
NewFloatValue(10000, 10.1),
36+
NewFloatValue(20000, 20.2),
37+
NewFloatValue(30000, 30.3),
38+
NewFloatValue(40000, 40.4),
39+
NewFloatValue(50000, 50.5),
40+
// there can be more tsm values - additional values here don't impact the result
41+
NewFloatValue(60000, 60.5),
42+
NewFloatValue(int64(tsmMaxTime), 70.5),
43+
}},
44+
}
45+
46+
files, err := newFiles(dir, tsmData...)
47+
require.NoError(err, "error creating files")
48+
49+
err = fs.Replace(nil, files)
50+
require.NoError(err, "error updating filestore")
51+
52+
// Create cache data that triggers the bug scenario:
53+
// Cache fills most of the buffer, TSM provides remaining data
54+
var cacheValues Values
55+
bufferSize := tsdb.DefaultMaxPointsPerBlock
56+
57+
// Cache: 100 to 100+bufferSize-5 (leaving room for some TSM data)
58+
// The cache data timestamps are older than the TSM data timestamps
59+
for i := 0; i < bufferSize-5; i++ {
60+
cacheValues = append(cacheValues, NewFloatValue(int64(100+i), float64(100+i)))
61+
}
62+
63+
// Verify precondition: TSM and cache timestamps must not overlap
64+
tsmTimestamps := make(map[int64]bool)
65+
for _, v := range tsmData[0].values {
66+
tsmTimestamps[v.UnixNano()] = true
67+
}
68+
69+
for _, cv := range cacheValues {
70+
require.Falsef(tsmTimestamps[cv.UnixNano()], "Test precondition failed: TSM and cache data share timestamp %d. This will interfere with test results.", cv.UnixNano())
71+
}
72+
// Verify the cache fills most but not all of the buffer
73+
require.Lessf(len(cacheValues), bufferSize, "Test precondition failed: Cache must not fill entire buffer (size %d), has %d values", bufferSize, len(cacheValues))
74+
75+
// Verify cache has sufficient data to trigger the bug
76+
minCacheSize := bufferSize - 10 // Leave room for at least some TSM data
77+
require.GreaterOrEqualf(len(cacheValues), minCacheSize, "Test precondition failed: Cache must have at least %d values to trigger bug, has %d", minCacheSize, len(cacheValues))
78+
79+
// Verify TSM has data both before and after cache range
80+
var tsmBeforeCache, tsmAfterCache int
81+
cacheMin, cacheMax := cacheValues[0].UnixNano(), cacheValues[len(cacheValues)-1].UnixNano()
82+
for _, v := range tsmData[0].values {
83+
if v.UnixNano() < cacheMin {
84+
tsmBeforeCache++
85+
} else if v.UnixNano() > cacheMax {
86+
tsmAfterCache++
87+
}
88+
}
89+
require.GreaterOrEqualf(tsmAfterCache, 2, "Test precondition failed: Need at least 2 TSM points after cache range to trigger bug, has %d", tsmAfterCache)
90+
91+
kc := fs.KeyCursor(context.Background(), []byte("measurement,field=value#!~#value"), 0, true)
92+
defer kc.Close()
93+
94+
cursor := newFloatArrayAscendingCursor()
95+
defer cursor.Close()
96+
97+
// Find min and max timestamps across both TSM and cache data
98+
minTime := int64(tsmMinTime) // Start with first TSM timestamp
99+
maxTime := int64(tsmMaxTime) // Start with last TSM timestamp
100+
101+
// Check if cache has earlier timestamps
102+
if len(cacheValues) > 0 && cacheValues[0].UnixNano() < minTime {
103+
minTime = cacheValues[0].UnixNano()
104+
}
105+
106+
// Check if cache has later timestamps
107+
if len(cacheValues) > 0 && cacheValues[len(cacheValues)-1].UnixNano() > maxTime {
108+
maxTime = cacheValues[len(cacheValues)-1].UnixNano()
109+
}
110+
111+
// Add buffer to ensure we capture all data
112+
maxTime += 1000
113+
114+
// search over the whole time range, ascending
115+
err = cursor.reset(minTime, maxTime, cacheValues, kc)
116+
require.NoError(err, "error resetting cursor")
117+
118+
// Collect all timestamps and values from all Next() calls
119+
var allTimestamps []int64
120+
var allValues []float64
121+
seenTimestamps := make(map[int64]bool)
122+
callNum := 0
123+
124+
for {
125+
result := cursor.Next() // run to exhaustion of the cursor
126+
if len(result.Timestamps) == 0 {
127+
break
128+
}
129+
callNum++
130+
131+
// Verify timestamp/value counts match
132+
assert.Equalf(len(result.Timestamps), len(result.Values), "Call %d: Timestamp/value count mismatch: %d timestamps, %d values",
133+
callNum, len(result.Timestamps), len(result.Values))
134+
135+
for i, ts := range result.Timestamps {
136+
if seenTimestamps[ts] {
137+
assert.Failf("DUPLICATE DATA BUG", "at call %d, position %d: Timestamp %d returned multiple times",
138+
callNum, i, ts)
139+
// Print first few timestamps from this batch for debugging
140+
if len(result.Timestamps) > 0 {
141+
endIdx := 10
142+
if endIdx > len(result.Timestamps) {
143+
endIdx = len(result.Timestamps)
144+
}
145+
t.Logf("First %d timestamps from call %d: %v", endIdx, callNum, result.Timestamps[:endIdx])
146+
}
147+
}
148+
seenTimestamps[ts] = true
149+
allTimestamps = append(allTimestamps, ts)
150+
}
151+
152+
// Collect values for verification
153+
allValues = append(allValues, result.Values...)
154+
}
155+
156+
// Verify no duplicates were found
157+
assert.Equalf(len(seenTimestamps), len(allTimestamps), "Found duplicate timestamps! Total: %d, Unique: %d", len(allTimestamps), len(seenTimestamps))
158+
159+
// Verify we got all expected data (cache + TSM)
160+
assert.Equalf(len(cacheValues)+len(tsmData[0].values), len(allTimestamps), "total timestamps")
161+
162+
// Verify data is in ascending order
163+
for i := 1; i < len(allTimestamps); i++ {
164+
assert.Greaterf(allTimestamps[i], allTimestamps[i-1], "Timestamps not in ascending order: %d followed by %d", allTimestamps[i-1], allTimestamps[i])
165+
}
166+
}

0 commit comments

Comments
 (0)