Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit e8e572e

Browse files
Separate connection pool into reader and writer.
Signed-off-by: Harkishen-Singh <[email protected]> This commit separates the reader and writer connection pools. By default, reader gets 0.3 and writer gets 0.5 times of the allowed connection from the db. But, in case of readonly mode, reader gets 0.5 times the connections from the db.
1 parent dc34480 commit e8e572e

File tree

10 files changed

+172
-158
lines changed

10 files changed

+172
-158
lines changed

pkg/api/delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func deleteHandler(config *Config, client *pgclient.Client) http.HandlerFunc {
7171
if client == nil {
7272
continue
7373
}
74-
pgDelete := deletePkg.PgDelete{Conn: client.Connection}
74+
pgDelete := deletePkg.PgDelete{Conn: client.Connection()}
7575
touchedMetrics, deletedSeriesIDs, rowsDeleted, err := pgDelete.DeleteSeries(matchers, start, end)
7676
if err != nil {
7777
respondErrorWithMessage(w, http.StatusInternalServerError, err, "deleting_series",

pkg/api/metadata.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func metricMetadataHandler(client *pgclient.Client) http.HandlerFunc {
3838
return
3939
}
4040
}
41-
data, err := metadata.MetricQuery(client.Connection, metric, int(limit))
41+
data, err := metadata.MetricQuery(client.Connection(), metric, int(limit))
4242
if err != nil {
4343
respondError(w, http.StatusInternalServerError, err, "fetching metric metadata")
4444
return

pkg/api/router.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query, reload func() error) (*mux.Router, error) {
3333
var writePreprocessors []parser.Preprocessor
3434
if apiConf.HighAvailability {
35-
service := ha.NewService(haClient.NewLeaseClient(client.Connection))
35+
service := ha.NewService(haClient.NewLeaseClient(client.Connection()))
3636
writePreprocessors = append(writePreprocessors, ha.NewFilter(service))
3737
}
3838
if apiConf.MultiTenancy != nil {
@@ -104,7 +104,7 @@ func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.
104104
reloadHandler := timeHandler(metrics.HTTPRequestDuration, "/-/reload", Reload(reload, apiConf.AdminAPIEnabled))
105105
router.Path("/-/reload").Methods(http.MethodPost).HandlerFunc(reloadHandler)
106106

107-
jaeger.ExtendQueryAPIs(router, client.Connection, query)
107+
jaeger.ExtendQueryAPIs(router, client.Connection(), query)
108108

109109
debugProf := router.PathPrefix("/debug/pprof").Subrouter()
110110
debugProf.Path("").Methods(http.MethodGet).HandlerFunc(pprof.Index)

pkg/pgclient/client.go

Lines changed: 110 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -33,84 +33,111 @@ type LockFunc = func(ctx context.Context, conn *pgx.Conn) error
3333

3434
// Client sends Prometheus samples to TimescaleDB
3535
type Client struct {
36-
Connection pgxconn.PgxConn
37-
QuerierConnection pgxconn.PgxConn
38-
ingestor *ingestor.DBIngestor
39-
querier querier.Querier
40-
promqlEngine *promql.Engine
41-
healthCheck health.HealthCheckerFn
42-
queryable promql.Queryable
43-
ConnectionStr string
44-
metricCache cache.MetricCache
45-
labelsCache cache.LabelsCache
46-
seriesCache cache.SeriesCache
47-
closePool bool
48-
sigClose chan struct{}
49-
haService *ha.Service
36+
readerConn pgxconn.PgxConn
37+
ingestor *ingestor.DBIngestor
38+
querier querier.Querier
39+
promqlEngine *promql.Engine
40+
healthCheck health.HealthCheckerFn
41+
queryable promql.Queryable
42+
metricCache cache.MetricCache
43+
labelsCache cache.LabelsCache
44+
seriesCache cache.SeriesCache
45+
closePool bool
46+
connPools []pgxconn.PgxConn
47+
sigClose chan struct{}
48+
haService *ha.Service
5049
}
5150

5251
// NewClient creates a new PostgreSQL client
5352
func NewClient(cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) {
54-
pgConfig, numCopiers, err := getPgConfig(cfg)
53+
var (
54+
err error
55+
writerPoolSize int
56+
numCopiers int
57+
writerPool *pgxpool.Pool
58+
writerPgConfig *pgxpool.Config
59+
defaultReaderPool = 0.5
60+
)
61+
if !readOnly {
62+
defaultReaderPool = 0.3 // Since defaultReaderPool + defaultWriterPool should be 0.8 or 80% of allowed database connections.
63+
writerPoolSize, err = cfg.GetPoolSize("writer", 0.5, cfg.WriterPoolSize)
64+
if err != nil {
65+
return nil, fmt.Errorf("get writer pool size: %w", err)
66+
}
67+
numCopiers, err = cfg.GetNumCopiers()
68+
if err != nil {
69+
return nil, fmt.Errorf("get num copiers: %w", err)
70+
}
71+
72+
if writerPoolSize >= numCopiers {
73+
log.Warn("msg", "writer-pool size greater than number of copiers. Decreasing copiers to leave some connections for miscellaneous tasks")
74+
numCopiers /= 2
75+
}
76+
77+
writerPgConfig, err = cfg.getPgConfig(writerPoolSize)
78+
if err != nil {
79+
return nil, fmt.Errorf("get writer pg-config: %w", err)
80+
}
81+
writerPgConfig.AfterConnect = schemaLocker
82+
writerPool, err = pgxpool.ConnectConfig(context.Background(), writerPgConfig)
83+
if err != nil {
84+
return nil, fmt.Errorf("err creating writer connection pool: %w", err)
85+
}
86+
}
87+
88+
readerPoolSize, err := cfg.GetPoolSize("reader", defaultReaderPool, cfg.ReaderPoolSize)
5589
if err != nil {
56-
return nil, err
90+
return nil, fmt.Errorf("get reader pool size: %w", err)
91+
}
92+
readerPgConfig, err := cfg.getPgConfig(readerPoolSize)
93+
if err != nil {
94+
return nil, fmt.Errorf("get reader pg-config: %w", err)
5795
}
5896

59-
pgConfig.AfterConnect = schemaLocker
60-
connectionPool, err := pgxpool.ConnectConfig(context.Background(), pgConfig)
97+
statementCacheLog := "disabled"
98+
if cfg.EnableStatementsCache {
99+
statementCacheLog = "512" // Default pgx.
100+
}
101+
log.Info("msg", getRedactedConnStr(cfg.GetConnectionStr()))
102+
log.Info("msg", "runtime",
103+
"writer-pool.size", writerPoolSize,
104+
"reader-pool.size", readerPoolSize,
105+
"min-pool.connections", minPoolSize,
106+
"num-copiers", numCopiers,
107+
"statement-cache", statementCacheLog)
108+
109+
readerPgConfig.AfterConnect = schemaLocker
110+
readerPool, err := pgxpool.ConnectConfig(context.Background(), readerPgConfig)
61111
if err != nil {
62-
log.Error("msg", "err creating connection pool for new client", "err", err.Error())
63-
return nil, err
112+
return nil, fmt.Errorf("err creating reader connection pool: %w", err)
64113
}
65114

66-
client, err := NewClientWithPool(cfg, numCopiers, connectionPool, mt, readOnly)
115+
client, err := NewClientWithPool(cfg, numCopiers, writerPool, readerPool, mt, readOnly)
67116
if err != nil {
68117
return client, err
69118
}
70119
client.closePool = true
71120
return client, err
72121
}
73122

74-
func getPgConfig(cfg *Config) (*pgxpool.Config, int, error) {
75-
minConnections, maxConnections, numCopiers, err := cfg.GetNumConnections()
76-
if err != nil {
77-
log.Error("msg", "configuring number of connections", "err", err.Error())
78-
return nil, numCopiers, err
79-
}
80-
connectionStr := cfg.GetConnectionStr()
81-
pgConfig, err := pgxpool.ParseConfig(connectionStr)
123+
func (cfg *Config) getPgConfig(poolSize int) (*pgxpool.Config, error) {
124+
min := minPoolSize
125+
max := poolSize
126+
connStr := cfg.GetConnectionStr()
127+
128+
pgConfig, err := pgxpool.ParseConfig(connStr)
82129
if err != nil {
83130
log.Error("msg", "configuring connection", "err", err.Error())
84-
return nil, numCopiers, err
131+
return nil, err
85132
}
86133

87-
// Configure the number of connections and statement cache capacity.
88-
pgConfig.MinConns = int32(minConnections)
89-
pgConfig.MaxConns = int32(maxConnections)
134+
pgConfig.MaxConns = int32(max)
135+
pgConfig.MinConns = int32(min)
90136

91-
var statementCacheLog string
92-
if cfg.EnableStatementsCache {
93-
// Using the PGX default of 512 for statement cache capacity.
94-
statementCacheCapacity := 512
95-
pgConfig.AfterRelease = observeStatementCacheState
96-
statementCacheEnabled.Set(1)
97-
statementCacheCap.Set(float64(statementCacheCapacity))
98-
statementCacheLog = fmt.Sprintf("%d statements", statementCacheCapacity)
99-
} else {
100-
log.Info("msg", "Statements cached disabled, using simple protocol for database connections.")
137+
if !cfg.EnableStatementsCache {
101138
pgConfig.ConnConfig.PreferSimpleProtocol = true
102-
statementCacheEnabled.Set(0)
103-
statementCacheCap.Set(0)
104-
statementCacheLog = "disabled"
105-
106139
}
107-
log.Info("msg", getRedactedConnStr(connectionStr),
108-
"numCopiers", numCopiers,
109-
"pool_max_conns", maxConnections,
110-
"pool_min_conns", minConnections,
111-
"statement_cache", statementCacheLog,
112-
)
113-
return pgConfig, numCopiers, nil
140+
return pgConfig, nil
114141
}
115142

116143
func getRedactedConnStr(s string) string {
@@ -130,8 +157,7 @@ func getRedactedConnStr(s string) string {
130157
}
131158

132159
// NewClientWithPool creates a new PostgreSQL client with an existing connection pool.
133-
func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) {
134-
dbConn := pgxconn.NewPgxConn(connPool)
160+
func NewClientWithPool(cfg *Config, numCopiers int, writerPool, readerPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) {
135161
sigClose := make(chan struct{})
136162
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
137163
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
@@ -143,40 +169,51 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
143169
InvertedLabelsCacheSize: cfg.CacheConfig.InvertedLabelsCacheSize,
144170
}
145171

146-
labelsReader := lreader.NewLabelsReader(dbConn, labelsCache, mt.ReadAuthorizer())
172+
readerConnPool := pgxconn.NewPgxConn(readerPool)
173+
connPools := []pgxconn.PgxConn{readerConnPool}
174+
labelsReader := lreader.NewLabelsReader(readerConnPool, labelsCache, mt.ReadAuthorizer())
147175
exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig)
148176

149-
dbQuerierConn := pgxconn.NewQueryLoggingPgxConn(connPool)
177+
dbQuerierConn := pgxconn.NewQueryLoggingPgxConn(readerPool)
150178
dbQuerier := querier.NewQuerier(dbQuerierConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer())
151179
queryable := query.NewQueryable(dbQuerier, labelsReader)
152180

153-
var dbIngestor *ingestor.DBIngestor
181+
var (
182+
dbIngestor *ingestor.DBIngestor
183+
)
154184
if !readOnly {
155185
var err error
156-
dbIngestor, err = ingestor.NewPgxIngestor(dbConn, metricsCache, seriesCache, exemplarKeyPosCache, &c)
186+
writerConnPool := pgxconn.NewPgxConn(writerPool)
187+
dbIngestor, err = ingestor.NewPgxIngestor(writerConnPool, metricsCache, seriesCache, exemplarKeyPosCache, &c)
157188
if err != nil {
158189
log.Error("msg", "err starting the ingestor", "err", err)
159190
return nil, err
160191
}
192+
connPools = append(connPools, writerConnPool)
161193
}
162194

163195
client := &Client{
164-
Connection: dbConn,
165-
QuerierConnection: dbQuerierConn,
166-
ingestor: dbIngestor,
167-
querier: dbQuerier,
168-
healthCheck: health.NewHealthChecker(dbConn),
169-
queryable: queryable,
170-
metricCache: metricsCache,
171-
labelsCache: labelsCache,
172-
seriesCache: seriesCache,
173-
sigClose: sigClose,
196+
readerConn: readerConnPool,
197+
ingestor: dbIngestor,
198+
querier: dbQuerier,
199+
healthCheck: health.NewHealthChecker(readerConnPool),
200+
queryable: queryable,
201+
metricCache: metricsCache,
202+
labelsCache: labelsCache,
203+
seriesCache: seriesCache,
204+
connPools: connPools,
205+
sigClose: sigClose,
174206
}
175207

176208
InitClientMetrics(client)
177209
return client, nil
178210
}
179211

212+
// Connection returns a pgxconn that is internally a reader connection pool.
213+
func (c *Client) Connection() pgxconn.PgxConn {
214+
return c.readerConn
215+
}
216+
180217
func (c *Client) InitPromQLEngine(cfg *query.Config) error {
181218
engine, err := query.NewEngine(log.GetLogger(), cfg.MaxQueryTimeout, cfg.LookBackDelta, cfg.SubQueryStepInterval, cfg.MaxSamples, cfg.EnabledFeatureMap)
182219
if err != nil {
@@ -198,7 +235,9 @@ func (c *Client) Close() {
198235
}
199236
close(c.sigClose)
200237
if c.closePool {
201-
c.Connection.Close()
238+
for i := range c.connPools {
239+
c.connPools[i].Close()
240+
}
202241
}
203242
if c.haService != nil {
204243
c.haService.Close()
@@ -270,19 +309,3 @@ func (c *Client) HealthCheck() error {
270309
func (c *Client) Queryable() promql.Queryable {
271310
return c.queryable
272311
}
273-
274-
func observeStatementCacheState(conn *pgx.Conn) bool {
275-
// connections have been opened and are released already
276-
// but the Client metrics have not been initialized yet
277-
if statementCacheLen == nil {
278-
return true
279-
}
280-
statementCache := conn.StatementCache()
281-
if statementCache == nil {
282-
return true
283-
}
284-
285-
statementCacheSize := statementCache.Len()
286-
statementCacheLen.Observe(float64(statementCacheSize))
287-
return true
288-
}

0 commit comments

Comments
 (0)