Skip to content

Commit d7c57a3

Browse files
committed
high level tests, more work to be done
Signed-off-by: Jesus Vazquez <[email protected]>
1 parent 6959efb commit d7c57a3

File tree

2 files changed

+138
-4
lines changed

2 files changed

+138
-4
lines changed

queryable/parquet_queryable_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,3 +1078,138 @@ func TestQueryableWithMultipleShards(t *testing.T) {
10781078
}
10791079
})
10801080
}
1081+
1082+
type parquetStorageWrapper struct {
1083+
queryable func() (prom_storage.Queryable, error)
1084+
}
1085+
1086+
func (w *parquetStorageWrapper) Appender(_ context.Context) prom_storage.Appender {
1087+
panic("not implemented")
1088+
}
1089+
1090+
func (w *parquetStorageWrapper) ChunkQuerier(_, _ int64) (prom_storage.ChunkQuerier, error) {
1091+
panic("not implemented")
1092+
}
1093+
1094+
func (w *parquetStorageWrapper) Querier(mint, maxt int64) (prom_storage.Querier, error) {
1095+
q, err := w.queryable()
1096+
if err != nil {
1097+
return nil, err
1098+
}
1099+
return q.Querier(mint, maxt)
1100+
}
1101+
1102+
func (w *parquetStorageWrapper) StartTime() (int64, error) {
1103+
panic("not implemented")
1104+
}
1105+
1106+
func (w *parquetStorageWrapper) Close() error {
1107+
return nil
1108+
}
1109+
1110+
func TestProjectionPushdownIntegration(t *testing.T) {
1111+
ctx := context.Background()
1112+
1113+
st := teststorage.New(t)
1114+
t.Cleanup(func() { _ = st.Close() })
1115+
1116+
app := st.Appender(ctx)
1117+
1118+
seriesLabels := []labels.Labels{
1119+
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-east", "instance", "server-1", "status", "200"),
1120+
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-east", "instance", "server-2", "status", "200"),
1121+
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-west", "instance", "server-1", "status", "200"),
1122+
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-west", "instance", "server-2", "status", "200"),
1123+
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-east", "instance", "server-1", "status", "200"),
1124+
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-east", "instance", "server-2", "status", "200"),
1125+
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-west", "instance", "server-1", "status", "200"),
1126+
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-west", "instance", "server-2", "status", "200"),
1127+
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-east", "instance", "server-3", "status", "404"),
1128+
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-west", "instance", "server-3", "status", "500"),
1129+
}
1130+
1131+
for _, lbls := range seriesLabels {
1132+
for i := 0; i < 5; i++ {
1133+
_, err := app.Append(0, lbls, int64(i*60000), 1.0)
1134+
require.NoError(t, err)
1135+
}
1136+
}
1137+
require.NoError(t, app.Commit())
1138+
1139+
bkt, err := newBucket(t.TempDir())
1140+
require.NoError(t, err)
1141+
t.Cleanup(func() { _ = bkt.Close() })
1142+
1143+
h := st.Head()
1144+
data := util.TestData{MinTime: h.MinTime(), MaxTime: h.MaxTime()}
1145+
shard := convertToParquet(t, ctx, bkt, data, h, nil)
1146+
1147+
parquetStorage := &parquetStorageWrapper{
1148+
queryable: func() (prom_storage.Queryable, error) {
1149+
return createQueryable([]storage.ParquetShard{shard}, WithHonorProjectionHints(true))
1150+
},
1151+
}
1152+
1153+
engine := promql.NewEngine(promql.EngineOpts{
1154+
Timeout: 30 * time.Second,
1155+
MaxSamples: 50000,
1156+
})
1157+
t.Cleanup(func() { _ = engine.Close() })
1158+
1159+
query := "sum by (service) (http_requests_total)"
1160+
1161+
qry, err := engine.NewInstantQuery(ctx, parquetStorage, nil, query, time.Unix(300, 0))
1162+
require.NoError(t, err)
1163+
1164+
result := qry.Exec(ctx)
1165+
require.NoError(t, result.Err)
1166+
1167+
vector, ok := result.Value.(promql.Vector)
1168+
require.True(t, ok, "Expected Vector result, got %T", result.Value)
1169+
1170+
require.Len(t, vector, 2, "Expected 2 aggregated series (api and web)")
1171+
1172+
serviceValues := make(map[string]float64)
1173+
for _, sample := range vector {
1174+
serviceName := sample.Metric.Get("service")
1175+
require.NotEmpty(t, serviceName, "service label should be present")
1176+
require.True(t, serviceName == "api" || serviceName == "web", "service should be api or web")
1177+
1178+
require.Equal(t, 1, sample.Metric.Len(), "Should only have service label after aggregation")
1179+
1180+
serviceValues[serviceName] = sample.F
1181+
}
1182+
1183+
require.Equal(t, 5.0, serviceValues["api"], "api service should sum to 5.0")
1184+
require.Equal(t, 5.0, serviceValues["web"], "web service should sum to 5.0")
1185+
1186+
queryableWithProjection, err := createQueryable([]storage.ParquetShard{shard}, WithHonorProjectionHints(true))
1187+
require.NoError(t, err)
1188+
querier, err := queryableWithProjection.Querier(data.MinTime, data.MaxTime)
1189+
require.NoError(t, err)
1190+
1191+
hints := &prom_storage.SelectHints{
1192+
ProjectionLabels: []string{"__name__", "service"},
1193+
ProjectionInclude: true,
1194+
}
1195+
1196+
matchers := []*labels.Matcher{
1197+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests_total"),
1198+
}
1199+
1200+
ss := querier.Select(ctx, true, hints, matchers...)
1201+
var projectedSeries []prom_storage.Series
1202+
1203+
for ss.Next() {
1204+
s := ss.At()
1205+
projectedSeries = append(projectedSeries, s)
1206+
1207+
s.Labels().Range(func(lbl labels.Label) {
1208+
require.True(t,
1209+
lbl.Name == "__name__" || lbl.Name == "service",
1210+
"Unexpected label %s found in projected result", lbl.Name)
1211+
})
1212+
}
1213+
require.NoError(t, ss.Err())
1214+
require.Len(t, projectedSeries, 10, "Should have all 10 original series but with projected labels")
1215+
}

search/materialize.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,9 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R
483483
return results, nil
484484
}
485485

486-
// MaterializeLabels creates labels for series respecting projection hints when enabled.
487-
// If honorProjectionHints is false or hints is nil, it behaves like MaterializeAllLabels.
488-
// If honorProjectionHints is true and ProjectionLabels are specified, it only materializes
489-
// the requested labels. The s_series_hash column is only included if explicitly requested.
486+
// MaterializeLabels retrieves series labels, optionally filtered by projection hints.
487+
// Returns all labels when projection is disabled, or only requested labels in the hints when enabled.
488+
// The s_series_hash column is included only when explicitly requested.
490489
func (m *Materializer) MaterializeLabels(ctx context.Context, hints *prom_storage.SelectHints, rgi int, rr []RowRange) ([][]labels.Label, error) {
491490
ctx, span := tracer.Start(ctx, "Materializer.MaterializeLabels")
492491
var err error

0 commit comments

Comments
 (0)