Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit d036197

Browse files
committed
Implement ingested_spans_count in telemetry data.
Signed-off-by: Harkishen-Singh <[email protected]>
1 parent e7eda92 commit d036197

File tree

17 files changed

+118
-91
lines changed

17 files changed

+118
-91
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ We use the following categories for changes:
1616

1717
### Added
1818
- Add Prometheus metrics support for Tracing [#1102]
19+
- Add ingested spans-count to telemetry [#1155]
1920

2021
### Fixed
2122
- Fix spans with end < start. Start and end are swapped in this case. [#1096]

pkg/migrations/migration_files_generated.go

Lines changed: 25 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/migrations/sql/idempotent/telemetry.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ $$
3636
)
3737
UPDATE _ps_catalog.promscale_instance_information SET
3838
promscale_ingested_samples_total = promscale_ingested_samples_total + COALESCE(x.del_promscale_ingested_samples_total, 0),
39+
promscale_ingested_spans_total = promscale_ingested_spans_total + COALESCE(x.del_promscale_ingested_spans_total, 0),
3940
promscale_metrics_queries_success_total = promscale_metrics_queries_success_total + COALESCE(x.del_promscale_metrics_queries_success_total, 0),
4041
promscale_metrics_queries_timedout_total = promscale_metrics_queries_timedout_total + COALESCE(x.del_promscale_metrics_queries_timedout_total, 0),
4142
promscale_metrics_queries_failed_total = promscale_metrics_queries_failed_total + COALESCE(x.del_promscale_metrics_queries_failed_total, 0),
@@ -46,6 +47,7 @@ $$
4647
(
4748
SELECT
4849
sum(promscale_ingested_samples_total) as del_promscale_ingested_samples_total,
50+
sum(promscale_ingested_spans_total) as del_promscale_ingested_spans_total,
4951
sum(promscale_metrics_queries_success_total) as del_promscale_metrics_queries_success_total,
5052
sum(promscale_metrics_queries_timedout_total) as del_promscale_metrics_queries_timedout_total,
5153
sum(promscale_metrics_queries_failed_total) as del_promscale_metrics_queries_failed_total,

pkg/migrations/sql/preinstall/012-telemetry.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ CREATE TABLE IF NOT EXISTS _ps_catalog.promscale_instance_information (
22
uuid UUID NOT NULL PRIMARY KEY,
33
last_updated TIMESTAMPTZ NOT NULL,
44
promscale_ingested_samples_total BIGINT DEFAULT 0 NOT NULL,
5-
promscale_metrics_queries_success_total BIGINT DEFAULT 0 NOT NULL,
5+
promscale_ingested_spans_total BIGINT DEFAULT 0 NOT NULL,
6+
promscale_metrics_queries_success_total BIGINT DEFAULT 0 NOT NULL,
67
promscale_metrics_queries_timedout_total BIGINT DEFAULT 0 NOT NULL,
78
promscale_metrics_queries_failed_total BIGINT DEFAULT 0 NOT NULL,
89
promscale_trace_query_requests_executed_total BIGINT DEFAULT 0 NOT NULL,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE _ps_catalog.promscale_instance_information
2+
ADD COLUMN promscale_ingested_spans_total BIGINT NOT NULL DEFAULT 0 ;

pkg/pgclient/client.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"net/url"
1111

12+
"github.com/google/uuid"
1213
"github.com/jackc/pgx/v4"
1314
"github.com/jackc/pgx/v4/pgxpool"
1415
"github.com/timescale/promscale/pkg/ha"
@@ -22,10 +23,22 @@ import (
2223
"github.com/timescale/promscale/pkg/prompb"
2324
"github.com/timescale/promscale/pkg/promql"
2425
"github.com/timescale/promscale/pkg/query"
26+
"github.com/timescale/promscale/pkg/telemetry"
2527
"github.com/timescale/promscale/pkg/tenancy"
2628
"go.opentelemetry.io/collector/model/pdata"
2729
)
2830

31+
var PromscaleID uuid.UUID
32+
33+
func init() {
34+
// PromscaleID must always be generated on start, so that it remains constant throughout the lifecycle.
35+
PromscaleID = uuid.New()
36+
}
37+
38+
// Post connect validation function, useful for things such as acquiring locks
39+
// that should live the duration of the connection
40+
type LockFunc = func(ctx context.Context, conn *pgx.Conn) error
41+
2942
// Client sends Prometheus samples to TimescaleDB
3043
type Client struct {
3144
Connection pgxconn.PgxConn
@@ -41,12 +54,9 @@ type Client struct {
4154
closePool bool
4255
sigClose chan struct{}
4356
haService *ha.Service
57+
TelemetryEngine telemetry.Engine
4458
}
4559

46-
// Post connect validation function, useful for things such as acquiring locks
47-
// that should live the duration of the connection
48-
type LockFunc = func(ctx context.Context, conn *pgx.Conn) error
49-
5060
// NewClient creates a new PostgreSQL client
5161
func NewClient(cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) {
5262
pgConfig, numCopiers, err := getPgConfig(cfg)
@@ -140,25 +150,34 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
140150
AsyncAcks: cfg.AsyncAcks,
141151
}
142152

153+
labelsReader := lreader.NewLabelsReader(dbConn, labelsCache)
154+
exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig)
155+
156+
dbQuerierConn := pgxconn.NewQueryLoggingPgxConn(connPool)
157+
dbQuerier := querier.NewQuerier(dbQuerierConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer())
158+
queryable := query.NewQueryable(dbQuerier, labelsReader)
159+
143160
var (
144-
dbIngestor *ingestor.DBIngestor
145-
exemplarKeyPosCache = cache.NewExemplarLabelsPosCache(cfg.CacheConfig)
161+
err error
162+
telemetryEngine telemetry.Engine
146163
)
164+
telemetryEngine, err = telemetry.NewEngine(dbConn, PromscaleID, queryable)
165+
if err != nil {
166+
log.Debug("msg", "err creating telemetry engine", "err", err.Error())
167+
telemetryEngine = telemetry.NewNoopEngine()
168+
}
169+
telemetryEngine.Start() // We stop the engine at client.Close().
170+
171+
var dbIngestor *ingestor.DBIngestor
147172
if !readOnly {
148173
var err error
149-
dbIngestor, err = ingestor.NewPgxIngestor(dbConn, metricsCache, seriesCache, exemplarKeyPosCache, &c)
174+
dbIngestor, err = ingestor.NewPgxIngestor(dbConn, metricsCache, seriesCache, exemplarKeyPosCache, &c, telemetryEngine)
150175
if err != nil {
151176
log.Error("msg", "err starting the ingestor", "err", err)
152177
return nil, err
153178
}
154179
}
155180

156-
labelsReader := lreader.NewLabelsReader(dbConn, labelsCache)
157-
158-
dbQuerierConn := pgxconn.NewQueryLoggingPgxConn(connPool)
159-
dbQuerier := querier.NewQuerier(dbQuerierConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer())
160-
queryable := query.NewQueryable(dbQuerier, labelsReader)
161-
162181
healthChecker := health.NewHealthChecker(dbConn)
163182
client := &Client{
164183
Connection: dbConn,
@@ -171,6 +190,7 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
171190
labelsCache: labelsCache,
172191
seriesCache: seriesCache,
173192
sigClose: sigClose,
193+
TelemetryEngine: telemetryEngine,
174194
}
175195

176196
InitClientMetrics(client)
@@ -180,6 +200,9 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
180200
// Close closes the client and performs cleanup
181201
func (c *Client) Close() {
182202
log.Info("msg", "Shutting down Client")
203+
if c.TelemetryEngine != nil {
204+
c.TelemetryEngine.Stop()
205+
}
183206
if c.ingestor != nil {
184207
c.ingestor.Close()
185208
}

pkg/pgmodel/ingestor/ingestor.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/timescale/promscale/pkg/pgmodel/model"
2020
"github.com/timescale/promscale/pkg/pgxconn"
2121
"github.com/timescale/promscale/pkg/prompb"
22+
"github.com/timescale/promscale/pkg/telemetry"
2223
"github.com/timescale/promscale/pkg/tracer"
2324
)
2425

@@ -38,15 +39,15 @@ type DBIngestor struct {
3839

3940
// NewPgxIngestor returns a new Ingestor that uses connection pool and a metrics cache
4041
// for caching metric table names.
41-
func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*DBIngestor, error) {
42+
func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg, t telemetry.Engine) (*DBIngestor, error) {
4243
dispatcher, err := newPgxDispatcher(conn, cache, sCache, eCache, cfg)
4344
if err != nil {
4445
return nil, err
4546
}
4647
return &DBIngestor{
4748
sCache: sCache,
4849
dispatcher: dispatcher,
49-
tWriter: trace.NewWriter(conn),
50+
tWriter: trace.NewWriter(conn, t),
5051
}, nil
5152
}
5253

@@ -60,7 +61,7 @@ func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error)
6061
c := cache.NewMetricCache(cacheConfig)
6162
s := cache.NewSeriesCache(cacheConfig, nil)
6263
e := cache.NewExemplarLabelsPosCache(cacheConfig)
63-
return NewPgxIngestor(conn, c, s, e, cfg)
64+
return NewPgxIngestor(conn, c, s, e, cfg, telemetry.NewNoopEngine())
6465
}
6566

6667
const (

pkg/pgmodel/ingestor/trace/writer.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ import (
1717

1818
"github.com/jackc/pgtype"
1919
"github.com/timescale/promscale/pkg/clockcache"
20+
"github.com/timescale/promscale/pkg/log"
2021
"github.com/timescale/promscale/pkg/pgmodel/metrics"
2122
"github.com/timescale/promscale/pkg/pgxconn"
23+
"github.com/timescale/promscale/pkg/telemetry"
2224
tput "github.com/timescale/promscale/pkg/util/throughput"
2325
)
2426

@@ -59,7 +61,10 @@ type traceWriterImpl struct {
5961
tagCache *clockcache.Cache
6062
}
6163

62-
func NewWriter(conn pgxconn.PgxConn) *traceWriterImpl {
64+
func NewWriter(conn pgxconn.PgxConn, t telemetry.Engine) *traceWriterImpl {
65+
if err := t.RegisterMetric("promscale_ingested_spans_total", metrics.IngestorItems.With(prometheus.Labels{"type": "trace", "kind": "span"})); err != nil {
66+
log.Debug("msg", "err registering telemetry metric promscale_ingested_spans_total", "err", err.Error())
67+
}
6368
return &traceWriterImpl{
6469
conn: conn,
6570
schemaCache: newSchemaCache(),

pkg/runner/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// This file and its contents are licensed under the Apache License 2.0.
22
// Please see the included NOTICE for copyright information and
33
// LICENSE for a copy of the license.
4+
45
package runner
56

67
import (
@@ -10,7 +11,6 @@ import (
1011
"strconv"
1112

1213
"github.com/jackc/pgx/v4"
13-
"github.com/timescale/promscale/pkg/api"
1414
"github.com/timescale/promscale/pkg/dataset"
1515
"github.com/timescale/promscale/pkg/log"
1616
"github.com/timescale/promscale/pkg/pgclient"
@@ -27,7 +27,7 @@ var (
2727
migrationLockError = fmt.Errorf("Could not acquire migration lock. Ensure there are no other connectors running and try again.")
2828
)
2929

30-
func CreateClient(cfg *Config, promMetrics *api.Metrics) (*pgclient.Client, error) {
30+
func CreateClient(cfg *Config) (*pgclient.Client, error) {
3131
// The TimescaleDB migration has to happen before other connections
3232
// are open, also it has to happen as the first command on a connection.
3333
// Thus we cannot rely on the migration lock here. Instead we assume

0 commit comments

Comments
 (0)