Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit e7eae31

Browse files
Add database metrics into pgxconn package.
Signed-off-by: Harkishen-Singh <[email protected]> This commit updates the implementation of pgx in pkg/pgxconn package and implements the following database metrics: 1. promscale_database_requests_total 2. promscale_database_request_errors_total 3. promscale_database_request_duration_seconds
1 parent 71acec0 commit e7eae31

File tree

3 files changed

+140
-8
lines changed

3 files changed

+140
-8
lines changed

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,8 +1214,6 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08
12141214
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
12151215
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
12161216
github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs=
1217-
github.com/jackc/pgx/v4 v4.15.0 h1:B7dTkXsdILD3MF987WGGCcg+tvLW6bZJdEcqVFeU//w=
1218-
github.com/jackc/pgx/v4 v4.15.0/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw=
12191217
github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d h1:8r5Lurk3Ur6K9Tz94Zz3kekJLZ8iVAG/GWHNqtLlmnw=
12201218
github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw=
12211219
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=

pkg/pgclient/client.go

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,15 @@ import (
88
"context"
99
"fmt"
1010
"net/url"
11+
"os"
12+
"time"
1113

1214
"github.com/google/uuid"
1315
"github.com/jackc/pgx/v4"
1416
"github.com/jackc/pgx/v4/pgxpool"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"go.opentelemetry.io/collector/model/pdata"
19+
1520
"github.com/timescale/promscale/pkg/ha"
1621
"github.com/timescale/promscale/pkg/log"
1722
"github.com/timescale/promscale/pkg/pgmodel/cache"
@@ -25,7 +30,7 @@ import (
2530
"github.com/timescale/promscale/pkg/query"
2631
"github.com/timescale/promscale/pkg/telemetry"
2732
"github.com/timescale/promscale/pkg/tenancy"
28-
"go.opentelemetry.io/collector/model/pdata"
33+
"github.com/timescale/promscale/pkg/util"
2934
)
3035

3136
var PromscaleID uuid.UUID
@@ -55,6 +60,7 @@ type Client struct {
5560
sigClose chan struct{}
5661
haService *ha.Service
5762
TelemetryEngine telemetry.Engine
63+
stopHealthChecker context.CancelFunc
5864
}
5965

6066
// NewClient creates a new PostgreSQL client
@@ -179,19 +185,22 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
179185
}
180186
}
181187

182-
healthChecker := health.NewHealthChecker(dbConn)
188+
healthCheckerCtx, stopHealthChecker := context.WithCancel(context.Background())
189+
healthCheckRoutine(healthCheckerCtx, dbConn)
190+
183191
client := &Client{
184192
Connection: dbConn,
185193
QuerierConnection: dbQuerierConn,
186194
ingestor: dbIngestor,
187195
querier: dbQuerier,
188-
healthCheck: healthChecker,
196+
healthCheck: health.NewHealthChecker(dbConn),
189197
queryable: queryable,
190198
metricCache: metricsCache,
191199
labelsCache: labelsCache,
192200
seriesCache: seriesCache,
193201
sigClose: sigClose,
194202
TelemetryEngine: telemetryEngine,
203+
stopHealthChecker: stopHealthChecker,
195204
}
196205

197206
InitClientMetrics(client)
@@ -201,6 +210,9 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
201210
// Close closes the client and performs cleanup
202211
func (c *Client) Close() {
203212
log.Info("msg", "Shutting down Client")
213+
if c.stopHealthChecker != nil {
214+
c.stopHealthChecker()
215+
}
204216
if c.TelemetryEngine != nil {
205217
c.TelemetryEngine.Stop()
206218
}
@@ -297,3 +309,43 @@ func observeStatementCacheState(conn *pgx.Conn) bool {
297309
statementCacheLen.Observe(float64(statementCacheSize))
298310
return true
299311
}
312+
313+
func healthCheckRoutine(ctx context.Context, conn pgxconn.PgxConn) {
314+
r := prometheus.DefaultRegisterer
315+
if env := os.Getenv("IS_TEST"); env == "true" {
316+
r = prometheus.NewRegistry()
317+
}
318+
dbHealthChecks := prometheus.NewCounter(
319+
prometheus.CounterOpts{
320+
Namespace: util.PromNamespace,
321+
Subsystem: "database",
322+
Name: "health_checks_total",
323+
Help: "Total number of database health checks performed.",
324+
},
325+
)
326+
dbHealthErrors := prometheus.NewCounter(
327+
prometheus.CounterOpts{
328+
Namespace: util.PromNamespace,
329+
Subsystem: "database",
330+
Name: "health_check_errors_total",
331+
Help: "Total number of database health check errors.",
332+
},
333+
)
334+
r.MustRegister(dbHealthChecks, dbHealthErrors)
335+
go func() {
336+
check := time.NewTicker(time.Minute)
337+
defer check.Stop()
338+
connection := health.NewHealthChecker(conn)
339+
for {
340+
select {
341+
case <-ctx.Done():
342+
return
343+
case <-check.C:
344+
}
345+
dbHealthChecks.Inc()
346+
if err := connection(); err != nil {
347+
dbHealthErrors.Inc()
348+
}
349+
}
350+
}()
351+
}

pkg/pgxconn/pgx_conn.go

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,47 @@ import (
1313
"github.com/jackc/pgconn"
1414
"github.com/jackc/pgx/v4"
1515
"github.com/jackc/pgx/v4/pgxpool"
16+
"github.com/prometheus/client_golang/prometheus"
17+
1618
"github.com/timescale/promscale/pkg/log"
19+
"github.com/timescale/promscale/pkg/util"
20+
)
21+
22+
var (
23+
requestTotal = prometheus.NewCounterVec(
24+
prometheus.CounterOpts{
25+
Namespace: util.PromNamespace,
26+
Subsystem: "database",
27+
Name: "requests_total",
28+
Help: "Total number of database requests.",
29+
}, []string{"method"},
30+
)
31+
errorsTotal = prometheus.NewCounterVec(
32+
prometheus.CounterOpts{
33+
Namespace: util.PromNamespace,
34+
Subsystem: "database",
35+
Name: "request_errors_total",
36+
Help: "Total number of database request errors.",
37+
}, []string{"method"},
38+
)
39+
duration = prometheus.NewHistogramVec(
40+
prometheus.HistogramOpts{
41+
Namespace: util.PromNamespace,
42+
Subsystem: "database",
43+
Name: "requests_duration_seconds",
44+
Help: "Time taken to complete a database request.",
45+
}, []string{"method"},
46+
)
1747
)
1848

49+
func init() {
50+
prometheus.MustRegister(requestTotal, errorsTotal, duration)
51+
}
52+
53+
func getLabelSet(method string) prometheus.Labels {
54+
return prometheus.Labels{"method": method}
55+
}
56+
1957
type PgxBatch interface {
2058
Queue(query string, arguments ...interface{})
2159
Len() int
@@ -109,15 +147,53 @@ func (p *connImpl) Close() {
109147
}
110148

111149
func (p *connImpl) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) {
112-
return p.Conn.Exec(ctx, sql, args...)
150+
lset := getLabelSet("exec")
151+
requestTotal.With(lset).Inc()
152+
start := time.Now()
153+
defer func() {
154+
duration.With(lset).Observe(time.Since(start).Seconds())
155+
}()
156+
tag, err := p.Conn.Exec(ctx, sql, args...)
157+
if err != nil {
158+
errorsTotal.With(lset).Inc()
159+
}
160+
return tag, err
113161
}
114162

115163
func (p *connImpl) Query(ctx context.Context, sql string, args ...interface{}) (PgxRows, error) {
116-
return p.Conn.Query(ctx, sql, args...)
164+
lset := getLabelSet("query")
165+
requestTotal.With(lset).Inc()
166+
start := time.Now()
167+
defer func() {
168+
duration.With(lset).Observe(time.Since(start).Seconds())
169+
}()
170+
rows, err := p.Conn.Query(ctx, sql, args...)
171+
if err != nil {
172+
errorsTotal.With(lset).Inc()
173+
}
174+
return rows, err
117175
}
118176

119177
func (p *connImpl) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
120-
return p.Conn.QueryRow(ctx, sql, args...)
178+
lset := getLabelSet("query_row")
179+
requestTotal.With(lset).Inc()
180+
start := time.Now()
181+
defer func() {
182+
duration.With(lset).Observe(time.Since(start).Seconds())
183+
}()
184+
return &rowWrapper{p.Conn.QueryRow(ctx, sql, args...)}
185+
}
186+
187+
type rowWrapper struct {
188+
r pgx.Row
189+
}
190+
191+
func (w *rowWrapper) Scan(dest ...interface{}) error {
192+
err := w.r.Scan(dest...)
193+
if err != nil {
194+
errorsTotal.With(getLabelSet("query_row")).Inc()
195+
}
196+
return err
121197
}
122198

123199
func (p *connImpl) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
@@ -133,6 +209,12 @@ func (p *connImpl) NewBatch() PgxBatch {
133209
}
134210

135211
func (p *connImpl) SendBatch(ctx context.Context, b PgxBatch) (pgx.BatchResults, error) {
212+
lset := getLabelSet("send_batch")
213+
requestTotal.With(lset).Inc()
214+
start := time.Now()
215+
defer func() {
216+
duration.With(lset).Observe(time.Since(start).Seconds())
217+
}()
136218
return p.Conn.SendBatch(ctx, b.(*pgx.Batch)), nil
137219
}
138220

0 commit comments

Comments
 (0)