Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit a96d677

Browse files
committed
Probe database for connection limit, and make it configurable
This commit contains two changes: 1. At startup, we now probe the database to determine the number of connections it can handle in order to ensure we don't go over that limit 2. We add a command line flag to override this setting for instances where our heuristic is incorrect. This should make the connector a better-behaved client of the database, making it much less likely to starve out other clients, or itself.
1 parent 0a60ca8 commit a96d677

File tree

2 files changed

+74
-33
lines changed

2 files changed

+74
-33
lines changed

pkg/pgclient/client.go

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/prometheus/client_golang/prometheus"
1010

11+
pgx "github.com/jackc/pgx/v4"
1112
"github.com/jackc/pgx/v4/pgxpool"
1213

1314
"github.com/timescale/timescale-prometheus/pkg/clockcache"
@@ -21,18 +22,20 @@ import (
2122

2223
// Config for the database
2324
type Config struct {
24-
host string
25-
port int
26-
user string
27-
password string
28-
database string
29-
sslMode string
30-
dbConnectRetries int
31-
AsyncAcks bool
32-
ReportInterval int
33-
LabelsCacheSize uint64
34-
MetricsCacheSize uint64
35-
SeriesCacheSize uint64
25+
host string
26+
port int
27+
user string
28+
password string
29+
database string
30+
sslMode string
31+
dbConnectRetries int
32+
AsyncAcks bool
33+
ReportInterval int
34+
LabelsCacheSize uint64
35+
MetricsCacheSize uint64
36+
SeriesCacheSize uint64
37+
WriteConnectionsPerProc int
38+
MaxConnections int
3639
}
3740

3841
// ParseFlags parses the configuration flags specific to PostgreSQL and TimescaleDB
@@ -48,6 +51,8 @@ func ParseFlags(cfg *Config) *Config {
4851
flag.IntVar(&cfg.ReportInterval, "tput-report", 0, "interval in seconds at which throughput should be reported")
4952
flag.Uint64Var(&cfg.LabelsCacheSize, "labels-cache-size", 10000, "maximum number of labels to cache")
5053
flag.Uint64Var(&cfg.MetricsCacheSize, "metrics-cache-size", pgmodel.DefaultMetricCacheSize, "maximum number of metric names to cache")
54+
flag.IntVar(&cfg.WriteConnectionsPerProc, "db-writer-connection-concurrency", 4, "maximum number of database connections per go process writing to the database")
55+
flag.IntVar(&cfg.MaxConnections, "db-connections-max", -1, "maximum connections that can be open at once, defaults to 80% of the max the DB can handle")
5156
return cfg
5257
}
5358

@@ -65,17 +70,14 @@ type Client struct {
6570
// NewClient creates a new PostgreSQL client
6671
func NewClient(cfg *Config, readHist prometheus.ObserverVec) (*Client, error) {
6772
connectionStr := cfg.GetConnectionStr()
68-
69-
maxProcs := runtime.GOMAXPROCS(-1)
70-
if maxProcs <= 0 {
71-
maxProcs = runtime.NumCPU()
72-
}
73-
if maxProcs <= 0 {
74-
maxProcs = 1
73+
minConnections, maxConnections, numCopiers, err := cfg.GetNumConnections()
74+
if err != nil {
75+
log.Error("err configuring number of connections", util.MaskPassword(err.Error()))
76+
return nil, err
7577
}
76-
connectionPool, err := pgxpool.Connect(context.Background(), connectionStr+fmt.Sprintf(" pool_max_conns=%d pool_min_conns=%d", maxProcs*pgmodel.ConnectionsPerProc, maxProcs))
78+
connectionPool, err := pgxpool.Connect(context.Background(), connectionStr+fmt.Sprintf(" pool_max_conns=%d pool_min_conns=%d", maxConnections, minConnections))
7779

78-
log.Info("msg", util.MaskPassword(connectionStr))
80+
log.Info("msg", util.MaskPassword(connectionStr), "numCopiers", numCopiers)
7981

8082
if err != nil {
8183
log.Error("err creating connection pool for new client", util.MaskPassword(err.Error()))
@@ -88,6 +90,7 @@ func NewClient(cfg *Config, readHist prometheus.ObserverVec) (*Client, error) {
8890
AsyncAcks: cfg.AsyncAcks,
8991
ReportInterval: cfg.ReportInterval,
9092
SeriesCacheSize: cfg.SeriesCacheSize,
93+
NumCopiers: numCopiers,
9194
}
9295
ingestor, err := pgmodel.NewPgxIngestorWithMetricCache(connectionPool, cache, &c)
9396
if err != nil {
@@ -114,6 +117,51 @@ func (cfg *Config) GetConnectionStr() string {
114117
cfg.host, cfg.port, cfg.user, cfg.database, cfg.password, cfg.sslMode)
115118
}
116119

120+
func (cfg *Config) GetNumConnections() (min int, max int, numCopiers int, err error) {
121+
maxProcs := runtime.GOMAXPROCS(-1)
122+
if cfg.WriteConnectionsPerProc < 1 {
123+
return 0, 0, 0, fmt.Errorf("invalid number of connections-per-proc %v, must be at least 1", cfg.WriteConnectionsPerProc)
124+
}
125+
perProc := cfg.WriteConnectionsPerProc
126+
max = cfg.MaxConnections
127+
if max < 1 {
128+
conn, err := pgx.Connect(context.Background(), cfg.GetConnectionStr())
129+
if err != nil {
130+
return 0, 0, 0, err
131+
}
132+
defer func() { _ = conn.Close(context.Background()) }()
133+
row := conn.QueryRow(context.Background(), "SHOW max_connections")
134+
err = row.Scan(&max)
135+
if err != nil {
136+
return 0, 0, 0, err
137+
}
138+
if max <= 1 {
139+
log.Warn("msg", "database can only handle 1 connection")
140+
return 1, 1, 1, nil
141+
}
142+
// we try to only use 80% the database connections
143+
max = int(0.8 * float32(max))
144+
}
145+
146+
// we want to leave some connections for non-copier usages, so in the event
147+
// there aren't enough connections available to satisfy our per-process
148+
// preferences we'll scale down the number of copiers
149+
min = maxProcs
150+
if max <= min {
151+
log.Warn("msg", fmt.Sprintf("database can only handle %v connection; connector has %v procs", max, maxProcs))
152+
return 1, max, max / 2, nil
153+
}
154+
155+
numCopiers = perProc * maxProcs
156+
// we leave one connection per-core for non-copier usages
157+
if numCopiers+maxProcs > max {
158+
log.Warn("msg", fmt.Sprintf("had to reduce the number of copiers due to connection limits: wanted %v, reduced to %v", numCopiers, max/2))
159+
numCopiers = max / 2
160+
}
161+
162+
return
163+
}
164+
117165
// Close closes the client and performs cleanup
118166
func (c *Client) Close() {
119167
c.ingestor.Close()

pkg/pgmodel/sql_ingest.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package pgmodel
66
import (
77
"context"
88
"fmt"
9-
"runtime"
109
"sort"
1110
"strings"
1211
"sync"
@@ -31,6 +30,7 @@ type Cfg struct {
3130
AsyncAcks bool
3231
ReportInterval int
3332
SeriesCacheSize uint64
33+
NumCopiers int
3434
}
3535

3636
// NewPgxIngestorWithMetricCache returns a new Ingestor that uses connection pool and a metrics cache
@@ -55,21 +55,14 @@ func NewPgxIngestor(c *pgxpool.Pool) (*DBIngestor, error) {
5555
return NewPgxIngestorWithMetricCache(c, cache, &Cfg{})
5656
}
5757

58-
var ConnectionsPerProc = 5
59-
6058
func newPgxInserter(conn pgxConn, cache MetricCache, cfg *Cfg) (*pgxInserter, error) {
6159
cmc := make(chan struct{}, 1)
6260

63-
maxProcs := runtime.GOMAXPROCS(-1)
64-
if maxProcs <= 0 {
65-
maxProcs = runtime.NumCPU()
66-
}
67-
if maxProcs <= 0 {
68-
maxProcs = 1
61+
numCopiers := cfg.NumCopiers
62+
if numCopiers < 1 {
63+
log.Warn("msg", "num copiers less than 1, setting to 1")
64+
numCopiers = 1
6965
}
70-
71-
// we leave one connection per-core for other usages
72-
numCopiers := maxProcs*ConnectionsPerProc - maxProcs
7366
toCopiers := make(chan copyRequest, numCopiers)
7467
for i := 0; i < numCopiers; i++ {
7568
go runInserter(conn, toCopiers)

0 commit comments

Comments
 (0)