Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit aebccfd

Browse files
committed
Adds support for reloader in Promscale.
Signed-off-by: Harkishen-Singh <[email protected]>
1 parent a6d66c8 commit aebccfd

File tree

8 files changed

+130
-57
lines changed

8 files changed

+130
-57
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ We use the following categories for changes:
2121
- Telemetry for recording rules and alerting [#1424]
2222
- Set number of ingest copiers to the number of DB CPUs [#1387]
2323
- Telemetry for helm chart installations [#1429]
24+
- Ability to reload rules and alerting config [#1426]
2425

2526
### Fixed
2627
- Trace query returns empty result when queried with

pkg/api/reload.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// This file and its contents are licensed under the Apache License 2.0.
2+
// Please see the included NOTICE for copyright information and
3+
// LICENSE for a copy of the license.
4+
5+
package api
6+
7+
import (
8+
"fmt"
9+
"net/http"
10+
11+
"github.com/timescale/promscale/pkg/log"
12+
)
13+
14+
func Reload(reload func() error, webAdmin bool) http.HandlerFunc {
15+
return func(w http.ResponseWriter, r *http.Request) {
16+
if !webAdmin {
17+
err := fmt.Errorf("reload received but web admin is disabled. To enable, start Promscale with '-web.enable-admin-api' flag")
18+
log.Error("msg", err.Error())
19+
http.Error(w, fmt.Errorf("failed to reload: %w", err).Error(), http.StatusUnauthorized)
20+
return
21+
}
22+
if err := reload(); err != nil {
23+
log.Error("msg", "failed to reload", "err", err.Error())
24+
http.Error(w, fmt.Errorf("failed to reload: %w", err).Error(), http.StatusInternalServerError)
25+
return
26+
}
27+
w.Header().Set("Content-Length", "0")
28+
}
29+
}

pkg/api/router.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/timescale/promscale/pkg/telemetry"
3030
)
3131

32-
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query) (*mux.Router, error) {
32+
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query, reload func() error) (*mux.Router, error) {
3333
var writePreprocessors []parser.Preprocessor
3434
if apiConf.HighAvailability {
3535
service := ha.NewService(haClient.NewLeaseClient(client.Connection))
@@ -101,6 +101,9 @@ func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.
101101
router.Path("/healthz").Methods(http.MethodGet, http.MethodOptions, http.MethodHead).HandlerFunc(Health(healthChecker))
102102
router.Path(apiConf.TelemetryPath).Methods(http.MethodGet).HandlerFunc(promhttp.Handler().ServeHTTP)
103103

104+
reloadHandler := timeHandler(metrics.HTTPRequestDuration, "/-/reload", Reload(reload, apiConf.AdminAPIEnabled))
105+
router.Path("/-/reload").Methods(http.MethodPost).HandlerFunc(reloadHandler)
106+
104107
jaeger.ExtendQueryAPIs(router, client.Connection, query)
105108

106109
debugProf := router.PathPrefix("/debug/pprof").Subrouter()

pkg/rules/rules.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Manager struct {
3232
postRulesProcessing prom_rules.RuleGroupPostProcessFunc
3333
}
3434

35-
func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config) (*Manager, error) {
35+
func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config) (*Manager, func() error, error) {
3636
discoveryManagerNotify := discovery.NewManager(ctx, log.GetLogger(), discovery.Name("notify"))
3737

3838
notifierManager := notifier.NewManager(&notifier.Options{
@@ -44,7 +44,7 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
4444
// For the moment, we do not have any external UI url, hence we provide an empty one.
4545
parsedUrl, err := url.Parse("")
4646
if err != nil {
47-
return nil, fmt.Errorf("parsing UI-URL: %w", err)
47+
return nil, nil, fmt.Errorf("parsing UI-URL: %w", err)
4848
}
4949

5050
rulesManager := prom_rules.NewManager(&prom_rules.ManagerOptions{
@@ -60,12 +60,24 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
6060
ForGracePeriod: cfg.ForGracePeriod,
6161
ResendDelay: cfg.ResendDelay,
6262
})
63-
return &Manager{
63+
64+
manager := &Manager{
6465
ctx: ctx,
6566
rulesManager: rulesManager,
6667
notifierManager: notifierManager,
6768
discoveryManager: discoveryManagerNotify,
68-
}, nil
69+
}
70+
return manager, manager.getReloader(cfg), nil
71+
}
72+
73+
func (m *Manager) getReloader(cfg *Config) func() error {
74+
return func() error {
75+
err := Validate(cfg) // This refreshes the RulesCfg.PrometheusConfig entry in RulesCfg after reading the PrometheusConfigAddress.
76+
if err != nil {
77+
return fmt.Errorf("error validating rules-config: %w", err)
78+
}
79+
return errors.WithMessage(m.ApplyConfig(cfg.PrometheusConfig), "error applying config")
80+
}
6981
}
7082

7183
func (m *Manager) WithPostRulesProcess(f prom_rules.RuleGroupPostProcessFunc) {

pkg/runner/runner.go

Lines changed: 58 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/http"
1515
"os"
1616
"os/signal"
17+
"syscall"
1718
"time"
1819

1920
_ "github.com/jackc/pgx/v4/stdlib"
@@ -25,7 +26,6 @@ import (
2526

2627
"github.com/google/uuid"
2728
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
28-
2929
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
3030
"github.com/prometheus/client_golang/prometheus"
3131
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -137,9 +137,52 @@ func Run(cfg *Config) error {
137137
}
138138
}
139139

140+
var (
141+
group run.Group
142+
reloadRules func() error
143+
)
144+
pendingTelemetry := map[string]string{
145+
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
146+
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
147+
}
148+
if cfg.RulesCfg.ContainsRules() {
149+
pendingTelemetry["rules_enabled"] = "true"
150+
if cfg.RulesCfg.ContainsAlertingConfig() {
151+
pendingTelemetry["alerting_enabled"] = "true"
152+
} else {
153+
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
154+
}
155+
156+
rulesCtx, stopRuler := context.WithCancel(context.Background())
157+
defer stopRuler()
158+
159+
var manager *rules.Manager
160+
manager, reloadRules, err = rules.NewManager(rulesCtx, prometheus.DefaultRegisterer, client, &cfg.RulesCfg)
161+
if err != nil {
162+
return fmt.Errorf("error creating rules manager: %w", err)
163+
}
164+
cfg.APICfg.Rules = manager
165+
166+
if err := reloadRules(); err != nil {
167+
return err
168+
}
169+
170+
group.Add(
171+
func() error {
172+
log.Info("msg", "Started Rule-Manager")
173+
return manager.Run()
174+
}, func(error) {
175+
log.Info("msg", "Stopping Rule-Manager")
176+
stopRuler()
177+
},
178+
)
179+
} else {
180+
log.Debug("msg", "Rules files not found in the given Prometheus configuration file. Both rule-manager and alerting will not be initialized")
181+
}
182+
140183
jaegerQuery := query.New(client.QuerierConnection, &cfg.TracingCfg)
141184

142-
router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery)
185+
router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery, reloadRules)
143186
if err != nil {
144187
log.Error("msg", "aborting startup due to error", "err", fmt.Sprintf("generate router: %s", err.Error()))
145188
return fmt.Errorf("generate router: %w", err)
@@ -149,7 +192,6 @@ func Run(cfg *Config) error {
149192
telemetryEngine.Start()
150193
defer telemetryEngine.Stop()
151194

152-
var group run.Group
153195
if len(cfg.ThanosStoreAPIListenAddr) > 0 {
154196
srv := thanos.NewStorage(client.Queryable())
155197
options := make([]grpc.ServerOption, 0)
@@ -222,44 +264,6 @@ func Run(cfg *Config) error {
222264
},
223265
)
224266

225-
pendingTelemetry := map[string]string{
226-
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
227-
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
228-
}
229-
if cfg.RulesCfg.ContainsRules() {
230-
pendingTelemetry["rules_enabled"] = "true"
231-
if cfg.RulesCfg.ContainsAlertingConfig() {
232-
pendingTelemetry["alerting_enabled"] = "true"
233-
} else {
234-
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
235-
}
236-
237-
rulesCtx, stopRuler := context.WithCancel(context.Background())
238-
defer stopRuler()
239-
240-
manager, err := rules.NewManager(rulesCtx, prometheus.DefaultRegisterer, client, &cfg.RulesCfg)
241-
if err != nil {
242-
return fmt.Errorf("error creating rules manager: %w", err)
243-
}
244-
cfg.APICfg.Rules = manager
245-
246-
group.Add(
247-
func() error {
248-
promCfg := cfg.RulesCfg.PrometheusConfig
249-
if err = manager.ApplyConfig(promCfg); err != nil {
250-
return fmt.Errorf("error applying Prometheus configuration to rules manager: %w", err)
251-
}
252-
log.Info("msg", "Started Rule-Manager")
253-
return manager.Run()
254-
}, func(err error) {
255-
log.Info("msg", "Stopping Rule-Manager")
256-
stopRuler()
257-
},
258-
)
259-
} else {
260-
log.Debug("msg", "Rules files not found in the given Prometheus configuration file. Both rule-manager and alerting will not be initialized")
261-
}
262-
263267
// Asynchronously update the telemetry information.
264268
go func() {
265269
for k, v := range pendingTelemetry {
@@ -296,11 +300,21 @@ func Run(cfg *Config) error {
296300

297301
// Listen to OS interrupt signals.
298302
c := make(chan os.Signal, 1)
299-
signal.Notify(c, os.Interrupt)
303+
signal.Notify(c, syscall.SIGINT, syscall.SIGHUP)
300304
group.Add(
301305
func() error {
302-
<-c
303-
return nil
306+
for {
307+
switch <-c {
308+
case syscall.SIGINT:
309+
return nil
310+
case syscall.SIGHUP:
311+
if err := reloadRules(); err != nil {
312+
log.Error("msg", "error reloading rules", "err", err.Error())
313+
continue
314+
}
315+
log.Debug("msg", "success reloading rules")
316+
}
317+
}
304318
}, func(err error) {
305319
close(c)
306320
},

pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config) (*mux.Router,
324324

325325
jaegerQuery := jaegerquery.New(pgClient.QuerierConnection, &jaegerquery.DefaultConfig)
326326

327-
router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery)
327+
router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery, nil)
328328
if err != nil {
329329
return nil, nil, fmt.Errorf("generate router: %w", err)
330330
}

pkg/tests/end_to_end_tests/rules_test.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import (
2323
"github.com/timescale/promscale/pkg/tenancy"
2424
)
2525

26-
const RecordingRulesEvalConfigPath = "../testdata/rules/config.recording_rules_eval.yaml"
26+
const (
27+
RecordingRulesEvalConfigPath = "../testdata/rules/config.recording_rules_eval.yaml"
28+
EmptyRecordingRulesConfigPath = "../testdata/rules/config.empty_rules.yaml"
29+
)
2730

2831
func TestRecordingRulesEval(t *testing.T) {
2932
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
@@ -56,22 +59,27 @@ func TestRecordingRulesEval(t *testing.T) {
5659
OutageTolerance: rules.DefaultOutageTolerance,
5760
ForGracePeriod: rules.DefaultForGracePeriod,
5861
ResendDelay: rules.DefaultResendDelay,
59-
PrometheusConfigAddress: RecordingRulesEvalConfigPath,
62+
PrometheusConfigAddress: EmptyRecordingRulesConfigPath, // Start with empty rules.
6063
}
6164
require.NoError(t, rules.Validate(rulesCfg))
62-
require.True(t, rulesCfg.ContainsRules())
65+
require.False(t, rulesCfg.ContainsRules())
6366

6467
ruleCtx, stopRuler := context.WithCancel(context.Background())
6568
defer stopRuler()
6669

67-
manager, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
70+
manager, reloadRules, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
6871
require.NoError(t, err)
6972

7073
require.NotNil(t, rulesCfg.PrometheusConfig)
74+
require.NoError(t, reloadRules())
75+
require.False(t, rulesCfg.ContainsRules())
76+
77+
ruleGroups := manager.RuleGroups()
78+
require.Equal(t, 0, len(ruleGroups))
7179

7280
manager.WithPostRulesProcess(func(*prom_rules.Group, time.Time, log.Logger) error {
7381
defer func() {
74-
stopRuler() // Cancels the context so that the blocking manager.Run() is released when the test finishes.
82+
stopRuler() // Shuts down the manager.Run() as soon as the test completes.
7583
}()
7684
// Check if recording rule as a metric exists in metric catalog table.
7785
var exists bool
@@ -88,8 +96,13 @@ func TestRecordingRulesEval(t *testing.T) {
8896

8997
return nil
9098
})
91-
require.NoError(t, manager.ApplyConfig(rulesCfg.PrometheusConfig))
92-
require.NoError(t, manager.Run(), "error running rules manager") // This is blocking. It will be released after stopRuler() in defer func.
99+
// Reload with configuration file that contains some rules.
100+
rulesCfg.PrometheusConfigAddress = RecordingRulesEvalConfigPath
101+
require.NoError(t, reloadRules())
102+
require.True(t, rulesCfg.ContainsRules())
103+
require.Equal(t, 1, len(manager.RuleGroups()))
104+
105+
require.NoError(t, manager.Run(), "error running rules manager")
93106
})
94107
}
95108

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Empty file indicates recording rules are not applied.

0 commit comments

Comments
 (0)