Skip to content

Commit d022b78

Browse files
authored
fix: SelectSeries not correct for single reports (#4617)
* fix: SelectSeries not correct for single reports The query-backend will only call the aggregate functions, when there is more than one report for particular type. In the time series case the aggregate function is also the one that ensures order/correctness of the shape of data. I considered doing this inside every query, but eventually it felt better to do this when we would normally "aggregate" * Ensure we are holding the lock when initializing the aggreagator. * Use error
1 parent 2241c41 commit d022b78

12 files changed

+309
-20
lines changed

pkg/querybackend/block_reader_test.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,18 +239,20 @@ func (s *testSuite) Test_SeriesLabels() {
239239
s.Assert().JSONEq(string(expected), string(actual))
240240
}
241241

242+
var startTime = time.Unix(1739263329, 0)
243+
242244
func (s *testSuite) Test_QueryTimeSeries() {
243245
query := &queryv1.Query{
244246
QueryType: queryv1.QueryType_QUERY_TIME_SERIES,
245247
TimeSeries: &queryv1.TimeSeriesQuery{
246248
GroupBy: []string{"service_name"},
247-
Step: 1.0, // 1 second step
249+
Step: 30.0,
248250
},
249251
}
250252

251253
req := &queryv1.InvokeRequest{
252-
StartTime: time.Now().Add(-1 * time.Hour).UnixMilli(),
253-
EndTime: time.Now().UnixMilli(),
254+
StartTime: startTime.UnixMilli(),
255+
EndTime: startTime.Add(time.Hour).UnixMilli(),
254256
Query: []*queryv1.Query{query},
255257
QueryPlan: s.plan,
256258
LabelSelector: "{}",
@@ -262,6 +264,47 @@ func (s *testSuite) Test_QueryTimeSeries() {
262264
s.Require().NotNil(resp)
263265
s.Require().Len(resp.Reports, 1)
264266
s.Require().NotNil(resp.Reports[0].TimeSeries)
267+
268+
actual, _ := json.Marshal(resp.Reports[0].TimeSeries.TimeSeries)
269+
expected, err := os.ReadFile("testdata/fixtures/time_series.json")
270+
s.Require().NoError(err)
271+
s.Assert().JSONEq(string(expected), string(actual))
272+
}
273+
274+
// When there is only one report we don't run the aggregate method. This check ensures that the timeseries, is still correctly formatted.
275+
func (s *testSuite) Test_QueryTimeSeriesOneReport() {
276+
query := &queryv1.Query{
277+
QueryType: queryv1.QueryType_QUERY_TIME_SERIES,
278+
TimeSeries: &queryv1.TimeSeriesQuery{
279+
GroupBy: []string{"service_name"},
280+
Step: 30.0,
281+
},
282+
}
283+
284+
// shorten plan so there is only one report
285+
shorterPlan := s.plan.CloneVT()
286+
shorterPlan.Root = s.plan.Root.CloneVT()
287+
shorterPlan.Root.Blocks = s.plan.Root.Blocks[:1]
288+
289+
req := &queryv1.InvokeRequest{
290+
StartTime: startTime.UnixMilli(),
291+
EndTime: startTime.Add(time.Hour).UnixMilli(),
292+
Query: []*queryv1.Query{query},
293+
QueryPlan: shorterPlan,
294+
LabelSelector: "{}",
295+
Tenant: s.tenant,
296+
}
297+
298+
resp, err := s.reader.Invoke(s.ctx, req)
299+
s.Require().NoError(err)
300+
s.Require().NotNil(resp)
301+
s.Require().Len(resp.Reports, 1)
302+
s.Require().NotNil(resp.Reports[0].TimeSeries)
303+
304+
actual, _ := json.Marshal(resp.Reports[0].TimeSeries.TimeSeries)
305+
expected, err := os.ReadFile("testdata/fixtures/time_series_first_block.json")
306+
s.Require().NoError(err)
307+
s.Assert().JSONEq(string(expected), string(actual))
265308
}
266309

267310
func (s *testSuite) Test_QueryTree_All_Tenant_Isolation() {

pkg/querybackend/query.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,13 @@ func registerQueryType(
6767
rt queryv1.ReportType,
6868
q queryHandler,
6969
a aggregatorProvider,
70+
alwaysAggregate bool, // this option will always call the aggregate method for this report type, so it will also run when there is only one report
7071
deps ...block.Section,
7172
) {
7273
registerQueryReportType(qt, rt)
7374
registerQueryHandler(qt, q)
7475
registerQueryDependencies(qt, deps...)
75-
registerAggregator(rt, a)
76+
registerAggregator(rt, a, alwaysAggregate)
7677
}
7778

7879
type blockContext struct {

pkg/querybackend/query_label_names.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func init() {
1818
queryv1.ReportType_REPORT_LABEL_NAMES,
1919
queryLabelNames,
2020
newLabelNameAggregator,
21+
false,
2122
[]block.Section{block.SectionTSDB}...,
2223
)
2324
}

pkg/querybackend/query_label_values.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func init() {
2020
queryv1.ReportType_REPORT_LABEL_VALUES,
2121
queryLabelValues,
2222
newLabelValueAggregator,
23+
false,
2324
[]block.Section{block.SectionTSDB}...,
2425
)
2526
}

pkg/querybackend/query_pprof.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func init() {
2121
queryv1.ReportType_REPORT_PPROF,
2222
queryPprof,
2323
newPprofAggregator,
24+
false,
2425
[]block.Section{
2526
block.SectionTSDB,
2627
block.SectionProfiles,

pkg/querybackend/query_series_labels.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func init() {
2121
queryv1.ReportType_REPORT_SERIES_LABELS,
2222
querySeriesLabels,
2323
newSeriesLabelsAggregator,
24+
false,
2425
[]block.Section{block.SectionTSDB}...,
2526
)
2627
}

pkg/querybackend/query_time_series.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func init() {
2222
queryv1.ReportType_REPORT_TIME_SERIES,
2323
queryTimeSeries,
2424
newTimeSeriesAggregator,
25+
true,
2526
[]block.Section{
2627
block.SectionTSDB,
2728
block.SectionProfiles,

pkg/querybackend/query_tree.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func init() {
1919
queryv1.ReportType_REPORT_TREE,
2020
queryTree,
2121
newTreeAggregator,
22+
false,
2223
[]block.Section{
2324
block.SectionTSDB,
2425
block.SectionProfiles,

pkg/querybackend/report_aggregator.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
var (
1111
aggregatorMutex = new(sync.RWMutex)
1212
aggregators = map[queryv1.ReportType]aggregatorProvider{}
13+
alwaysAggregate = map[queryv1.ReportType]struct{}{}
1314
queryReportType = map[queryv1.QueryType]queryv1.ReportType{}
1415
)
1516

@@ -23,14 +24,29 @@ type aggregator interface {
2324
build() *queryv1.Report
2425
}
2526

26-
func registerAggregator(t queryv1.ReportType, ap aggregatorProvider) {
27+
func registerAggregator(t queryv1.ReportType, ap aggregatorProvider, always bool) {
2728
aggregatorMutex.Lock()
2829
defer aggregatorMutex.Unlock()
2930
_, ok := aggregators[t]
3031
if ok {
3132
panic(fmt.Sprintf("%s: aggregator already registered", t))
3233
}
3334
aggregators[t] = ap
35+
36+
if always {
37+
_, ok := alwaysAggregate[t]
38+
if ok {
39+
panic(fmt.Sprintf("%s: aggregator already registered to always aggregat", t))
40+
}
41+
alwaysAggregate[t] = struct{}{}
42+
}
43+
}
44+
45+
func isAlwaysAggregate(t queryv1.ReportType) bool {
46+
aggregatorMutex.RLock()
47+
defer aggregatorMutex.RUnlock()
48+
_, result := alwaysAggregate[t]
49+
return result
3450
}
3551

3652
func getAggregator(r *queryv1.InvokeRequest, x *queryv1.Report) (aggregator, error) {
@@ -97,13 +113,22 @@ func (ra *reportAggregator) aggregateReport(r *queryv1.Report) (err error) {
97113
ra.sm.Lock()
98114
v, found := ra.staged[r.ReportType]
99115
if !found {
100-
// We delay aggregation until we have at least two
101-
// reports of the same type. Otherwise, we just store
102-
// the report and will return it as is, if it is the
103-
// only one.
104-
ra.staged[r.ReportType] = r
116+
// For most ReportTypes we delay aggregation until we have at least two
117+
// reports of the same type. In case there is only one we will
118+
// return it as is.
119+
if !isAlwaysAggregate(r.ReportType) {
120+
ra.staged[r.ReportType] = r
121+
ra.sm.Unlock()
122+
return nil
123+
}
124+
125+
// Some ReportTypes need to call the aggregator for correctness even when
126+
// there is only single instance, in that case call the aggregator right
127+
// away and mark the report type appropriately in the staged map.
128+
err = ra.aggregateReportNoCheck(r)
129+
ra.staged[r.ReportType] = nil
105130
ra.sm.Unlock()
106-
return nil
131+
return err
107132
}
108133
// Found a staged report of the same type.
109134
if v != nil {

pkg/querybackend/report_aggregator_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func mockAggregatorProvider(req *queryv1.InvokeRequest) aggregator {
5050

5151
func TestReportAggregator_SingleReport(t *testing.T) {
5252
reportType := queryv1.ReportType(999) // use a high number that won't conflict with other registrations
53-
registerAggregator(reportType, mockAggregatorProvider)
53+
registerAggregator(reportType, mockAggregatorProvider, false)
5454
defer func() {
5555
aggregatorMutex.Lock()
5656
delete(aggregators, reportType)
@@ -78,7 +78,7 @@ func TestReportAggregator_SingleReport(t *testing.T) {
7878

7979
func TestReportAggregator_TwoReports(t *testing.T) {
8080
reportType := queryv1.ReportType(999)
81-
registerAggregator(reportType, mockAggregatorProvider)
81+
registerAggregator(reportType, mockAggregatorProvider, false)
8282
defer func() {
8383
aggregatorMutex.Lock()
8484
delete(aggregators, reportType)
@@ -115,8 +115,8 @@ func TestReportAggregator_MultipleTypes(t *testing.T) {
115115
type1 := queryv1.ReportType(999)
116116
type2 := queryv1.ReportType(998)
117117

118-
registerAggregator(type1, mockAggregatorProvider)
119-
registerAggregator(type2, mockAggregatorProvider)
118+
registerAggregator(type1, mockAggregatorProvider, false)
119+
registerAggregator(type2, mockAggregatorProvider, false)
120120
defer func() {
121121
aggregatorMutex.Lock()
122122
delete(aggregators, type1)
@@ -167,7 +167,7 @@ func TestReportAggregator_NilReport(t *testing.T) {
167167

168168
func TestReportAggregator_AggregateResponse(t *testing.T) {
169169
reportType := queryv1.ReportType(999)
170-
registerAggregator(reportType, mockAggregatorProvider)
170+
registerAggregator(reportType, mockAggregatorProvider, false)
171171
defer func() {
172172
aggregatorMutex.Lock()
173173
delete(aggregators, reportType)
@@ -194,7 +194,7 @@ func TestReportAggregator_AggregateResponse(t *testing.T) {
194194

195195
func TestReportAggregator_ConcurrentAccess(t *testing.T) {
196196
reportType := queryv1.ReportType(999)
197-
registerAggregator(reportType, mockAggregatorProvider)
197+
registerAggregator(reportType, mockAggregatorProvider, false)
198198
defer func() {
199199
aggregatorMutex.Lock()
200200
delete(aggregators, reportType)
@@ -230,7 +230,7 @@ func TestReportAggregator_ConcurrentAccess(t *testing.T) {
230230

231231
func TestGetAggregator(t *testing.T) {
232232
reportType := queryv1.ReportType(999)
233-
registerAggregator(reportType, mockAggregatorProvider)
233+
registerAggregator(reportType, mockAggregatorProvider, false)
234234
defer func() {
235235
aggregatorMutex.Lock()
236236
delete(aggregators, reportType)
@@ -256,9 +256,9 @@ func TestGetAggregator_UnknownReportType(t *testing.T) {
256256
func TestRegisterAggregator_Duplicate(t *testing.T) {
257257
reportType := queryv1.ReportType(999)
258258

259-
registerAggregator(reportType, mockAggregatorProvider)
259+
registerAggregator(reportType, mockAggregatorProvider, false)
260260
assert.Panics(t, func() {
261-
registerAggregator(reportType, mockAggregatorProvider)
261+
registerAggregator(reportType, mockAggregatorProvider, false)
262262
})
263263

264264
aggregatorMutex.Lock()

0 commit comments

Comments
 (0)