Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit aa707cf

Browse files
committed
Change subsystem to type label.
Signed-off-by: Harkishen-Singh <[email protected]> As per the design doc, now type can have ['metric', 'trace']. This commit ensures the same, leaving subsystem for ['metric_batcher', 'copier'] which will be implemented in another PR that will be responsible to update all metric path based metrics.
1 parent 5d046b5 commit aa707cf

File tree

7 files changed

+74
-65
lines changed

7 files changed

+74
-65
lines changed

pkg/jaeger/query/query.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,83 +44,83 @@ func (p *Query) SpanWriter() spanstore.Writer {
4444
}
4545

4646
func (p *Query) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
47-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": ""}).Inc()
47+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_trace", "code": ""}).Inc()
4848
start := time.Now()
4949
res, err := getTrace(ctx, p.conn, traceID)
5050
if err == nil {
51-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": "200"}).Inc()
51+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_trace", "code": "200"}).Inc()
5252
traceRequestsExec.Add(1)
53-
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace"}).Observe(time.Since(start).Seconds())
53+
metrics.RequestsDuration.With(prometheus.Labels{"type": "trace", "handler": "get_trace"}).Observe(time.Since(start).Seconds())
5454
} else {
55-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_trace", "code": "500"}).Inc()
55+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_trace", "code": "500"}).Inc()
5656
}
5757
return res, logError(err)
5858
}
5959

6060
func (p *Query) GetServices(ctx context.Context) ([]string, error) {
61-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": ""}).Inc()
61+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_services", "code": ""}).Inc()
6262
start := time.Now()
6363
res, err := getServices(ctx, p.conn)
6464
if err == nil {
65-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": "200"}).Inc()
66-
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services"}).Observe(time.Since(start).Seconds())
65+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_services", "code": "200"}).Inc()
66+
metrics.RequestsDuration.With(prometheus.Labels{"type": "trace", "handler": "get_services"}).Observe(time.Since(start).Seconds())
6767
} else {
68-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_services", "code": "500"}).Inc()
68+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_services", "code": "500"}).Inc()
6969
}
7070
return res, logError(err)
7171
}
7272

7373
func (p *Query) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
74-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": ""}).Inc()
74+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_operations", "code": ""}).Inc()
7575
start := time.Now()
7676
res, err := getOperations(ctx, p.conn, query)
7777
if err == nil {
78-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": "200"}).Inc()
79-
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations"}).Observe(time.Since(start).Seconds())
78+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_operations", "code": "200"}).Inc()
79+
metrics.RequestsDuration.With(prometheus.Labels{"type": "trace", "handler": "get_operations"}).Observe(time.Since(start).Seconds())
8080
} else {
81-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_operations", "code": "500"}).Inc()
81+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_operations", "code": "500"}).Inc()
8282
}
8383
return res, logError(err)
8484
}
8585

8686
func (p *Query) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
87-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": ""}).Inc()
87+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "find_traces", "code": ""}).Inc()
8888
start := time.Now()
8989
res, err := findTraces(ctx, p.conn, query)
9090
if err == nil {
91-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": "200"}).Inc()
91+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "find_traces", "code": "200"}).Inc()
9292
traceRequestsExec.Add(1)
93-
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces"}).Observe(time.Since(start).Seconds())
93+
metrics.RequestsDuration.With(prometheus.Labels{"type": "trace", "handler": "find_traces"}).Observe(time.Since(start).Seconds())
9494
} else {
95-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_traces", "code": "500"}).Inc()
95+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "find_traces", "code": "500"}).Inc()
9696
}
9797
return res, logError(err)
9898
}
9999

100100
func (p *Query) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
101-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": ""}).Inc()
101+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "find_trace_ids", "code": ""}).Inc()
102102
start := time.Now()
103103
res, err := findTraceIDs(ctx, p.conn, query)
104104
if err == nil {
105-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": "200"}).Inc()
105+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "find_trace_ids", "code": "200"}).Inc()
106106
traceRequestsExec.Add(1)
107-
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids"}).Observe(time.Since(start).Seconds())
107+
metrics.RequestsDuration.With(prometheus.Labels{"type": "trace", "handler": "find_trace_ids"}).Observe(time.Since(start).Seconds())
108108
} else {
109-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "find_trace_ids", "code": "500"}).Inc()
109+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "find_trace_ids", "code": "500"}).Inc()
110110
}
111111
return res, logError(err)
112112
}
113113

114114
func (p *Query) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
115-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": ""}).Inc()
115+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_dependencies", "code": ""}).Inc()
116116
start := time.Now()
117117
res, err := getDependencies(ctx, p.conn, endTs, lookback)
118118
if err == nil {
119-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": "200"}).Inc()
119+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_dependencies", "code": "200"}).Inc()
120120
dependencyRequestsExec.Add(1)
121-
metrics.RequestsDuration.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies"}).Observe(time.Since(start).Seconds())
121+
metrics.RequestsDuration.With(prometheus.Labels{"type": "trace", "handler": "get_dependencies"}).Observe(time.Since(start).Seconds())
122122
} else {
123-
metrics.RequestsTotal.With(prometheus.Labels{"subsystem": "trace", "handler": "get_dependencies", "code": "500"}).Inc()
123+
metrics.RequestsTotal.With(prometheus.Labels{"type": "trace", "handler": "get_dependencies", "code": "500"}).Inc()
124124
}
125125
return res, logError(err)
126126
}

pkg/pgmodel/cache/metrics.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
package cache
66

77
import (
8+
"context"
89
"fmt"
9-
"github.com/prometheus/client_golang/prometheus"
10-
"github.com/timescale/promscale/pkg/util"
1110
"sync/atomic"
1211
"time"
12+
13+
"github.com/prometheus/client_golang/prometheus"
14+
"github.com/timescale/promscale/pkg/util"
1315
)
1416

1517
var (
@@ -18,18 +20,18 @@ var (
1820
Namespace: util.PromNamespace,
1921
Subsystem: "cache",
2022
Name: "enabled",
21-
Help: "Cache is enalbed or not.",
23+
Help: "Cache is enabled or not.",
2224
},
23-
[]string{"subsystem", "name"}, // type => ["trace" or "metric"] and name => name of the cache i.e., metric cache, series cache, etc.
25+
[]string{"type", "name"}, // type => ["trace" or "metric"] and name => name of the cache i.e., metric cache, series cache, schema cache, etc.
2426
)
2527
capacity = prometheus.NewGaugeVec(
2628
prometheus.GaugeOpts{
2729
Namespace: util.PromNamespace,
2830
Subsystem: "cache",
2931
Name: "capacity",
30-
Help: "Cache is enabled or not.",
32+
Help: "Capacity of cache in terms of elements count.",
3133
},
32-
[]string{"subsystem", "name"},
34+
[]string{"type", "name"},
3335
)
3436
sizeBytes = prometheus.NewGaugeVec(
3537
prometheus.GaugeOpts{
@@ -38,7 +40,7 @@ var (
3840
Name: "size_bytes",
3941
Help: "Cache size in bytes.",
4042
},
41-
[]string{"subsystem", "name"},
43+
[]string{"type", "name"},
4244
)
4345
evictionsTotal = prometheus.NewCounterVec(
4446
prometheus.CounterOpts{
@@ -47,19 +49,18 @@ var (
4749
Name: "evictions_total",
4850
Help: "Total evictions in a clockcache.",
4951
},
50-
[]string{"subsystem", "name"},
52+
[]string{"type", "name"},
5153
)
5254
)
5355

54-
func init() {
56+
func InitMetrics(ctx context.Context) {
5557
prometheus.MustRegister(
5658
Enabled,
5759
capacity,
5860
sizeBytes,
5961
evictionsTotal,
6062
)
61-
funcs.Store([]updateFunc{})
62-
go metricsUpdater()
63+
go metricsUpdater(ctx)
6364
}
6465

6566
const (
@@ -77,30 +78,33 @@ type updateFunc struct {
7778

7879
var funcs atomic.Value
7980

81+
func init() {
82+
funcs.Store([]updateFunc{})
83+
}
84+
8085
// RegisterUpdateFunc updates some metrics like SizeBytes and Capacity every 30 secs.
8186
// Earlier these were done via supplying a func to NewGaugeFunc that called that func
8287
// when prometheus scraped. But now we have labels, and we have to use NewGaugeVec
8388
// which does not allow to implement a func. Hence, we have to choose the routine way
8489
// in order to update these metrics.
8590
func RegisterUpdateFunc(kind MetricKind, update func(metric prometheus.Collector)) {
8691
l := funcs.Load().([]updateFunc)
87-
switch kind {
88-
case Cap:
89-
l = append(l, updateFunc{kind, update})
90-
case Size:
91-
l = append(l, updateFunc{kind, update})
92-
case Evict:
93-
l = append(l, updateFunc{kind, update})
94-
default:
92+
if !(kind == Cap || kind == Size || kind == Evict) {
9593
panic(fmt.Sprintf("invalid kind %d", kind))
9694
}
95+
l = append(l, updateFunc{kind, update})
9796
funcs.Store(l)
9897
}
9998

100-
func metricsUpdater() {
101-
update := time.NewTicker(time.Second * 10)
99+
func metricsUpdater(ctx context.Context) {
100+
update := time.NewTicker(time.Second * 30)
102101
defer update.Stop()
103-
for range update.C {
102+
for {
103+
select {
104+
case <-ctx.Done():
105+
return
106+
case <-update.C:
107+
}
104108
if len(funcs.Load().([]updateFunc)) == 0 {
105109
continue
106110
}

pkg/pgmodel/ingestor/trace/cache.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ func newTagCache() *clockcache.Cache {
3030
}
3131

3232
func registerToMetrics(cacheKind string, c *clockcache.Cache) {
33-
pgmodelCache.Enabled.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind})
33+
pgmodelCache.Enabled.With(prometheus.Labels{"type": "trace", "name": cacheKind})
3434
pgmodelCache.RegisterUpdateFunc(pgmodelCache.Cap, func(collector prometheus.Collector) {
3535
metric := collector.(*prometheus.GaugeVec)
36-
metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Set(float64(c.Cap()))
36+
metric.With(prometheus.Labels{"type": "trace", "name": cacheKind}).Set(float64(c.Cap()))
3737
})
3838
pgmodelCache.RegisterUpdateFunc(pgmodelCache.Size, func(collector prometheus.Collector) {
3939
metric := collector.(*prometheus.GaugeVec)
40-
metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Set(float64(c.SizeBytes()))
40+
metric.With(prometheus.Labels{"type": "trace", "name": cacheKind}).Set(float64(c.SizeBytes()))
4141
})
4242
pgmodelCache.RegisterUpdateFunc(pgmodelCache.Evict, func(collector prometheus.Collector) {
4343
metric := collector.(*prometheus.CounterVec)
44-
metric.With(prometheus.Labels{"subsystem": "trace", "name": cacheKind}).Add(float64(c.Evictions()))
44+
metric.With(prometheus.Labels{"type": "trace", "name": cacheKind}).Add(float64(c.Evictions()))
4545
})
4646
}

pkg/pgmodel/ingestor/trace/writer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ func getServiceName(rSpan pdata.ResourceSpans) string {
133133

134134
func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces) error {
135135
ingestStart := time.Now()
136-
metrics.ActiveWriteRequests.With(prometheus.Labels{"subsystem": "trace"}).Inc()
136+
metrics.ActiveWriteRequests.With(prometheus.Labels{"type": "trace", "kind": "spans"}).Inc()
137137
defer func() {
138-
metrics.ActiveWriteRequests.With(prometheus.Labels{"subsystem": "trace"}).Dec()
139-
metrics.IngestDuration.With(prometheus.Labels{"subsystem": "trace"}).Observe(time.Since(ingestStart).Seconds())
138+
metrics.ActiveWriteRequests.With(prometheus.Labels{"type": "trace", "kind": "spans"}).Dec()
139+
metrics.IngestDuration.With(prometheus.Labels{"type": "trace"}).Observe(time.Since(ingestStart).Seconds())
140140
}()
141141

142142
rSpans := traces.ResourceSpans()
@@ -315,9 +315,9 @@ func (t *traceWriterImpl) InsertTraces(ctx context.Context, traces pdata.Traces)
315315
return fmt.Errorf("error sending trace batches: %w", err)
316316
}
317317

318-
metrics.InsertDuration.With(prometheus.Labels{"subsystem": "trace"}).Observe(time.Since(start).Seconds())
319-
metrics.IngestedTotal.With(prometheus.Labels{"type": "spans"}).Add(float64(traces.SpanCount()))
320-
metrics.MaxSentTimestamp.With(prometheus.Labels{"subsystem": "trace"}).Set(float64(maxEndTimestamp))
318+
metrics.InsertDuration.With(prometheus.Labels{"type": "trace"}).Observe(time.Since(start).Seconds())
319+
metrics.IngestedTotal.With(prometheus.Labels{"type": "trace", "kind": "spans"}).Add(float64(traces.SpanCount()))
320+
metrics.MaxSentTimestamp.With(prometheus.Labels{"type": "trace"}).Set(float64(maxEndTimestamp))
321321

322322
// Only report telemetry if ingestion successful.
323323
tput.ReportSpansProcessed(timestamp.FromTime(time.Now()), traces.SpanCount())

pkg/pgmodel/metrics/metrics.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ var (
7272
Name: "ingested_total",
7373
Help: "Total number of insertables ingested in the database.",
7474
},
75-
[]string{"type"},
75+
[]string{"kind", "type"},
7676
)
7777
ActiveWriteRequests = prometheus.NewGaugeVec(
7878
prometheus.GaugeOpts{
@@ -81,7 +81,7 @@ var (
8181
Name: "active_write_requests",
8282
Help: "Number of write requests that are active in the ingestion pipeline.",
8383
},
84-
[]string{"subsystem"},
84+
[]string{"type", "kind"},
8585
)
8686
InsertDuration = prometheus.NewHistogramVec(
8787
prometheus.HistogramOpts{
@@ -91,7 +91,7 @@ var (
9191
Help: "Time taken to insert a batch of samples or traces into the database.",
9292
Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 50, 100, 250, 500, 1000, 2500},
9393
},
94-
[]string{"subsystem"},
94+
[]string{"type"},
9595
)
9696
IngestDuration = prometheus.NewHistogramVec(
9797
prometheus.HistogramOpts{
@@ -101,7 +101,7 @@ var (
101101
Help: "Time taken to process (including filling up caches) and insert a batch of samples or traces into the database.",
102102
Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 50, 100, 250, 500, 1000, 2500},
103103
},
104-
[]string{"subsystem"},
104+
[]string{"type"},
105105
)
106106
MaxSentTimestamp = prometheus.NewGaugeVec(
107107
prometheus.GaugeOpts{
@@ -110,7 +110,7 @@ var (
110110
Name: "max_sent_timestamp_milliseconds",
111111
Help: "Maximum sent timestamp into the database. For samples, it is the sample timestamp and for traces, it is the maximum end timestamp.",
112112
},
113-
[]string{"subsystem"},
113+
[]string{"type"},
114114
)
115115
RequestsDuration = prometheus.NewHistogramVec(
116116
prometheus.HistogramOpts{
@@ -120,7 +120,7 @@ var (
120120
Help: "Time taken by function to respond to query.",
121121
Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 50, 100, 250, 500, 1000, 2500},
122122
},
123-
[]string{"subsystem", "handler"},
123+
[]string{"type", "handler"},
124124
)
125125
RequestsTotal = prometheus.NewCounterVec(
126126
prometheus.CounterOpts{
@@ -129,7 +129,7 @@ var (
129129
Name: "requests_total",
130130
Help: "Total query requests.",
131131
},
132-
[]string{"subsystem", "handler", "code"},
132+
[]string{"type", "handler", "code"},
133133
)
134134
)
135135

pkg/runner/runner.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/timescale/promscale/pkg/api"
3030
"github.com/timescale/promscale/pkg/jaeger/query"
3131
"github.com/timescale/promscale/pkg/log"
32+
"github.com/timescale/promscale/pkg/pgmodel/cache"
3233
"github.com/timescale/promscale/pkg/telemetry"
3334
"github.com/timescale/promscale/pkg/thanos"
3435
"github.com/timescale/promscale/pkg/tracer"
@@ -104,6 +105,10 @@ func Run(cfg *Config) error {
104105
}(ctx)
105106
}
106107

108+
cacheMetricsCtx, stopCacheMetricsRoutine := context.WithCancel(context.Background())
109+
defer stopCacheMetricsRoutine()
110+
cache.InitMetrics(cacheMetricsCtx)
111+
107112
promMetrics := api.InitMetrics()
108113
client, err := CreateClient(cfg, promMetrics)
109114
if err != nil {

scripts/end_to_end_tests.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ compare_connector_and_prom "series?match%5B%5D=ts_prom_sent_samples_total"
209209
# Labels endpoint cannot be compared to Prometheus becuase it will always differ due to direct backfilling of the real dataset.
210210
# We have to compare it to the correct expected output. Note that `namespace` and `node` labels are from JSON import payload,
211211
# while `custom_label` label is from text format write request.
212-
EXPECTED_OUTPUT1='{"status":"success","data":["__name__","code","custom_label","handler","instance","job","le","method","mode","namespace","node","path","quantile","status","version"]}'
213-
EXPECTED_OUTPUT2='{"status":"success","data":["__name__","code","custom_label","handler","instance","job","le","method","mode","namespace","node","path","quantile","status"]}'
212+
EXPECTED_OUTPUT1='{"status":"success","data":["__name__","code","custom_label","handler","instance","job","le","method","mode","name","namespace","node","path","quantile","status","type","version"]}'
213+
EXPECTED_OUTPUT2='{"status":"success","data":["__name__","code","custom_label","handler","instance","job","le","method","mode","name","namespace","node","path","quantile","type","status"]}'
214214
LABELS_OUTPUT=$(curl -s "http://${CONNECTOR_URL}/api/v1/labels")
215215

216216
if [ "${LABELS_OUTPUT}" != "${EXPECTED_OUTPUT1}" ] && [ "${LABELS_OUTPUT}" != "${EXPECTED_OUTPUT2}" ]; then

0 commit comments

Comments
 (0)