Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 48a326e

Browse files
Adds support for reloader in Promscale.
Signed-off-by: Harkishen-Singh <[email protected]>
1 parent dd8f633 commit 48a326e

File tree

6 files changed

+175
-8
lines changed

6 files changed

+175
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ We use the following categories for changes:
1919
- `-enable-feature=promql-per-step-stats` feature for statistics in PromQL evaluation
2020
- Add `readinessProbe` in helm chart [#1266]
2121
- Telemetry for recording rules and alerting [#1424]
22+
- Ability to reload rules and alerting config [#1426]
2223

2324
### Fixed
2425
- Trace query returns empty result when queried with

pkg/api/reload.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// This file and its contents are licensed under the Apache License 2.0.
2+
// Please see the included NOTICE for copyright information and
3+
// LICENSE for a copy of the license.
4+
5+
package api
6+
7+
import (
8+
"fmt"
9+
"net/http"
10+
11+
"github.com/timescale/promscale/pkg/log"
12+
)
13+
14+
func Reload(reload chan<- chan error) http.HandlerFunc {
15+
return func(w http.ResponseWriter, r *http.Request) {
16+
errCh := make(chan error, 1)
17+
// Inline with what Prometheus does for reloading API.
18+
// https://github.com/prometheus/prometheus/blob/e166cf402a9d2b838d5fba72d5b60117e9265898/web/web.go#L774
19+
reload <- errCh
20+
if err := <-errCh; err != nil {
21+
log.Error("msg", "failed to reload", "err", err.Error())
22+
http.Error(w, fmt.Errorf("failed to reload: %w", err).Error(), http.StatusInternalServerError)
23+
}
24+
w.Header().Set("Content-Length", "0")
25+
}
26+
}

pkg/api/router.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/timescale/promscale/pkg/telemetry"
3030
)
3131

32-
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query) (*mux.Router, error) {
32+
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query, reload chan<- chan error) (*mux.Router, error) {
3333
var writePreprocessors []parser.Preprocessor
3434
if apiConf.HighAvailability {
3535
service := ha.NewService(haClient.NewLeaseClient(client.Connection))
@@ -101,6 +101,9 @@ func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.
101101
router.Path("/healthz").Methods(http.MethodGet, http.MethodOptions, http.MethodHead).HandlerFunc(Health(healthChecker))
102102
router.Path(apiConf.TelemetryPath).Methods(http.MethodGet).HandlerFunc(promhttp.Handler().ServeHTTP)
103103

104+
reloadHandler := timeHandler(metrics.HTTPRequestDuration, "/-/reload", Reload(reload))
105+
router.Path("/-/reload").Methods(http.MethodPost).HandlerFunc(reloadHandler)
106+
104107
jaeger.ExtendQueryAPIs(router, client.Connection, query)
105108

106109
debugProf := router.PathPrefix("/debug/pprof").Subrouter()

pkg/reloader/reloader.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// This file and its contents are licensed under the Apache License 2.0.
2+
// Please see the included NOTICE for copyright information and
3+
// LICENSE for a copy of the license.
4+
5+
package reloader
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"os"
11+
"os/signal"
12+
"sync"
13+
"syscall"
14+
15+
"github.com/timescale/promscale/pkg/log"
16+
)
17+
18+
type service struct {
19+
name string
20+
payload func() error
21+
}
22+
23+
type loader struct {
24+
ctx context.Context
25+
mux sync.Mutex
26+
services []service
27+
webAdmin bool
28+
// Signal from an external routine. We accept an error channel to respond
29+
// to the http request in case some error happens during reloading.
30+
sigExt <-chan chan error
31+
}
32+
33+
// New creates a new thread-safe loader that can reloads a set of services
34+
// added through the Add() function.
35+
//
36+
// Reloading can be done by:
37+
// 1. SIGHUP. This is implemented by default
38+
// 2. External call. This happens on receiving a signal from the given sig.
39+
// The loader responds to an error channel after reloading,
40+
// indicating whether the reload was successful or not.
41+
//
42+
// Services for reloading can be added by Add(). When the reloading
43+
// starts, all services added are reloaded serially to ensure that
44+
// any dependent service is reloaded first. This also means that
45+
// each reload is a sync process and waits for the current service
46+
// to complete reloading before moving to the next.
47+
//
48+
// Note:
49+
// 1. All services are loaded once after calling Run()
50+
// 2. Reloading stops if any of the services give out an error while reloading.
51+
// This is important since the following dependent services might crash on reloading
52+
// if reloading their child do not succeed
53+
// 2. Loader becomes useless after a context cancel
54+
func New(ctx context.Context, webAdmin bool, sig <-chan chan error) *loader {
55+
return &loader{
56+
ctx: ctx,
57+
webAdmin: webAdmin,
58+
services: make([]service, 0),
59+
sigExt: sig,
60+
}
61+
}
62+
63+
// Add adds the service to the list of reloadable services.
64+
// The given update func is called whenever a request to reload
65+
// is received.
66+
func (l *loader) Add(serviceName string, update func() error) {
67+
l.mux.Lock()
68+
defer l.mux.Unlock()
69+
l.services = append(l.services, service{serviceName, update})
70+
}
71+
72+
func (l *loader) Run() error {
73+
sighup := make(chan os.Signal, 1)
74+
signal.Notify(sighup, syscall.SIGHUP)
75+
76+
if err := l.reload(); err != nil {
77+
return fmt.Errorf("error reloading: %w", err)
78+
}
79+
for {
80+
select {
81+
case <-l.ctx.Done():
82+
l.services = nil // Make services GC-able.
83+
return nil
84+
case <-sighup:
85+
if err := l.reload(); err != nil {
86+
log.Error("msg", "stopping reload", "reason", err.Error())
87+
}
88+
case errCh := <-l.sigExt:
89+
if !l.webAdmin {
90+
// Reloads via external calls are secured by web-admin.
91+
err := fmt.Errorf("reload received but web admin is disabled. To enable, start Promscale with '-web.enable-admin-api' flag")
92+
log.Error("msg", err.Error())
93+
errCh <- err
94+
continue
95+
}
96+
if err := l.reload(); err != nil {
97+
log.Error("msg", "stopping reload", "reason", err.Error())
98+
errCh <- err
99+
} else {
100+
errCh <- nil
101+
}
102+
}
103+
}
104+
}
105+
106+
func (l *loader) reload() error {
107+
l.mux.Lock()
108+
defer l.mux.Unlock()
109+
110+
// Reload services serially, in the order of their addition.
111+
for _, s := range l.services {
112+
log.Debug("msg", "reloading...", "service-name", s.name)
113+
if err := s.payload(); err != nil {
114+
return fmt.Errorf("error reloading '%s'", s.name)
115+
}
116+
}
117+
return nil
118+
}

pkg/runner/runner.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/google/uuid"
2727
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
28+
"github.com/pkg/errors"
2829

2930
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
3031
"github.com/prometheus/client_golang/prometheus"
@@ -36,6 +37,7 @@ import (
3637
"github.com/timescale/promscale/pkg/pgclient"
3738
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
3839
dbMetrics "github.com/timescale/promscale/pkg/pgmodel/metrics/database"
40+
"github.com/timescale/promscale/pkg/reloader"
3941
"github.com/timescale/promscale/pkg/rules"
4042
"github.com/timescale/promscale/pkg/telemetry"
4143
"github.com/timescale/promscale/pkg/thanos"
@@ -138,8 +140,9 @@ func Run(cfg *Config) error {
138140
}
139141

140142
jaegerQuery := query.New(client.QuerierConnection, &cfg.TracingCfg)
143+
reloadSig := make(chan chan error, 1)
141144

142-
router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery)
145+
router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery, reloadSig)
143146
if err != nil {
144147
log.Error("msg", "aborting startup due to error", "err", fmt.Sprintf("generate router: %s", err.Error()))
145148
return fmt.Errorf("generate router: %w", err)
@@ -222,6 +225,10 @@ func Run(cfg *Config) error {
222225
},
223226
)
224227

228+
loadCtx, stopLoader := context.WithCancel(context.Background())
229+
defer stopLoader()
230+
loader := reloader.New(loadCtx, cfg.APICfg.AdminAPIEnabled, reloadSig)
231+
225232
pendingTelemetry := map[string]string{
226233
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
227234
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
@@ -243,15 +250,19 @@ func Run(cfg *Config) error {
243250
}
244251
cfg.APICfg.Rules = manager
245252

253+
loader.Add("rules-manager", func() error {
254+
err := rules.Validate(&cfg.RulesCfg) // This refreshes the cfg.RulesCfg.PrometheusConfig entry in RulesCfg.
255+
if err != nil {
256+
return fmt.Errorf("error validating rules-config: %w", err)
257+
}
258+
return errors.WithMessage(manager.ApplyConfig(cfg.RulesCfg.PrometheusConfig), "error applying Prometheus configuration to rules manager")
259+
})
260+
246261
group.Add(
247262
func() error {
248-
promCfg := cfg.RulesCfg.PrometheusConfig
249-
if err = manager.ApplyConfig(promCfg); err != nil {
250-
return fmt.Errorf("error applying Prometheus configuration to rules manager: %w", err)
251-
}
252263
log.Info("msg", "Started Rule-Manager")
253264
return manager.Run()
254-
}, func(err error) {
265+
}, func(error) {
255266
log.Info("msg", "Stopping Rule-Manager")
256267
stopRuler()
257268
},
@@ -267,6 +278,14 @@ func Run(cfg *Config) error {
267278
}
268279
}()
269280

281+
group.Add(
282+
func() error {
283+
return loader.Run()
284+
}, func(error) {
285+
stopLoader()
286+
},
287+
)
288+
270289
mux := http.NewServeMux()
271290
mux.Handle("/", router)
272291

pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config) (*mux.Router,
324324

325325
jaegerQuery := jaegerquery.New(pgClient.QuerierConnection, &jaegerquery.DefaultConfig)
326326

327-
router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery)
327+
router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery, nil)
328328
if err != nil {
329329
return nil, nil, fmt.Errorf("generate router: %w", err)
330330
}

0 commit comments

Comments
 (0)