Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ all-tests-with-coverage:

bench-select:
$(GOTEST) -bench=BenchmarkSelect -run='^$$' ./queryable/... -benchtime=1s -count=1

lint:
golangci-lint -c .golangci.yml run --fix
18 changes: 13 additions & 5 deletions queryable/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type queryableOpts struct {
dataBytesLimitFunc search.QuotaLimitFunc
materializedSeriesCallback search.MaterializedSeriesFunc
materializedLabelsFilterCallback search.MaterializedLabelsFilterCallback

cacheRowRangesForConstraints bool
cacheRowRangesForConstraints bool
honorProjectionHints bool
}

var DefaultQueryableOpts = queryableOpts{
Expand All @@ -58,8 +58,8 @@ var DefaultQueryableOpts = queryableOpts{
dataBytesLimitFunc: search.NoopQuotaLimitFunc,
materializedSeriesCallback: search.NoopMaterializedSeriesFunc,
materializedLabelsFilterCallback: search.NoopMaterializedLabelsFilterCallback,

cacheRowRangesForConstraints: false,
cacheRowRangesForConstraints: false,
honorProjectionHints: false,
}

type QueryableOpts func(*queryableOpts)
Expand Down Expand Up @@ -115,6 +115,14 @@ func WithCacheRowRangesForConstraints(cache bool) QueryableOpts {
}
}

// WithHonorProjectionHints enables or disables projection pushdown optimization.
// When enabled, only the labels specified in SelectHints.ProjectionLabels will be materialized.
func WithHonorProjectionHints(honor bool) QueryableOpts {
return func(opts *queryableOpts) {
opts.honorProjectionHints = honor
}
}

type parquetQueryable struct {
shardsFinder ShardsFinderFunction
constraintCacheFunc ConstraintCacheFunction
Expand Down Expand Up @@ -390,7 +398,7 @@ func newQueryableShard(
return nil, err
}
materializer, err := search.NewMaterializer(
shardSchema, chunksDecoder, shard, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback, opts.materializedLabelsFilterCallback)
shardSchema, chunksDecoder, shard, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback, opts.materializedLabelsFilterCallback, opts.honorProjectionHints)
if err != nil {
return nil, err
}
Expand Down
135 changes: 135 additions & 0 deletions queryable/parquet_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,3 +1082,138 @@ func TestQueryableWithMultipleShards(t *testing.T) {
}
})
}

type parquetStorageWrapper struct {
queryable func() (prom_storage.Queryable, error)
}

func (w *parquetStorageWrapper) Appender(_ context.Context) prom_storage.Appender {
panic("not implemented")
}

func (w *parquetStorageWrapper) ChunkQuerier(_, _ int64) (prom_storage.ChunkQuerier, error) {
panic("not implemented")
}

func (w *parquetStorageWrapper) Querier(mint, maxt int64) (prom_storage.Querier, error) {
q, err := w.queryable()
if err != nil {
return nil, err
}
return q.Querier(mint, maxt)
}

func (w *parquetStorageWrapper) StartTime() (int64, error) {
panic("not implemented")
}

func (w *parquetStorageWrapper) Close() error {
return nil
}

func TestProjectionPushdownIntegration(t *testing.T) {
ctx := context.Background()

st := teststorage.New(t)
t.Cleanup(func() { _ = st.Close() })

app := st.Appender(ctx)

seriesLabels := []labels.Labels{
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-east", "instance", "server-1", "status", "200"),
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-east", "instance", "server-2", "status", "200"),
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-west", "instance", "server-1", "status", "200"),
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-west", "instance", "server-2", "status", "200"),
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-east", "instance", "server-1", "status", "200"),
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-east", "instance", "server-2", "status", "200"),
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-west", "instance", "server-1", "status", "200"),
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-west", "instance", "server-2", "status", "200"),
labels.FromStrings("__name__", "http_requests_total", "service", "api", "region", "us-east", "instance", "server-3", "status", "404"),
labels.FromStrings("__name__", "http_requests_total", "service", "web", "region", "us-west", "instance", "server-3", "status", "500"),
}

for _, lbls := range seriesLabels {
for i := 0; i < 5; i++ {
_, err := app.Append(0, lbls, int64(i*60000), 1.0)
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

bkt, err := newBucket(t.TempDir())
require.NoError(t, err)
t.Cleanup(func() { _ = bkt.Close() })

h := st.Head()
data := util.TestData{MinTime: h.MinTime(), MaxTime: h.MaxTime()}
shard := convertToParquet(t, ctx, bkt, data, h, nil)

parquetStorage := &parquetStorageWrapper{
queryable: func() (prom_storage.Queryable, error) {
return createQueryable([]storage.ParquetShard{shard}, WithHonorProjectionHints(true))
},
}

engine := promql.NewEngine(promql.EngineOpts{
Timeout: 30 * time.Second,
MaxSamples: 50000,
})
t.Cleanup(func() { _ = engine.Close() })

query := "sum by (service) (http_requests_total)"

qry, err := engine.NewInstantQuery(ctx, parquetStorage, nil, query, time.Unix(300, 0))
require.NoError(t, err)

result := qry.Exec(ctx)
require.NoError(t, result.Err)

vector, ok := result.Value.(promql.Vector)
require.True(t, ok, "Expected Vector result, got %T", result.Value)

require.Len(t, vector, 2, "Expected 2 aggregated series (api and web)")

serviceValues := make(map[string]float64)
for _, sample := range vector {
serviceName := sample.Metric.Get("service")
require.NotEmpty(t, serviceName, "service label should be present")
require.True(t, serviceName == "api" || serviceName == "web", "service should be api or web")

require.Equal(t, 1, sample.Metric.Len(), "Should only have service label after aggregation")

serviceValues[serviceName] = sample.F
}

require.Equal(t, 5.0, serviceValues["api"], "api service should sum to 5.0")
require.Equal(t, 5.0, serviceValues["web"], "web service should sum to 5.0")

queryableWithProjection, err := createQueryable([]storage.ParquetShard{shard}, WithHonorProjectionHints(true))
require.NoError(t, err)
querier, err := queryableWithProjection.Querier(data.MinTime, data.MaxTime)
require.NoError(t, err)

hints := &prom_storage.SelectHints{
ProjectionLabels: []string{"__name__", "service"},
ProjectionInclude: true,
}

matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests_total"),
}

ss := querier.Select(ctx, true, hints, matchers...)
var projectedSeries []prom_storage.Series

for ss.Next() {
s := ss.At()
projectedSeries = append(projectedSeries, s)

s.Labels().Range(func(lbl labels.Label) {
require.True(t,
lbl.Name == "__name__" || lbl.Name == "service",
"Unexpected label %s found in projected result", lbl.Name)
})
}
require.NoError(t, ss.Err())
require.Len(t, projectedSeries, 10, "Should have all 10 original series but with projected labels")
}
Loading
Loading