Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit de0b384

Browse files
committed
Adds support for reloader in Promscale.
Signed-off-by: Harkishen-Singh <[email protected]>
1 parent dd8f633 commit de0b384

File tree

4 files changed

+163
-7
lines changed

4 files changed

+163
-7
lines changed

pkg/api/reload.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// This file and its contents are licensed under the Apache License 2.0.
2+
// Please see the included NOTICE for copyright information and
3+
// LICENSE for a copy of the license.
4+
5+
package api
6+
7+
import (
8+
"fmt"
9+
"net/http"
10+
11+
"github.com/timescale/promscale/pkg/log"
12+
)
13+
14+
func Reload(reload chan<- chan error) http.HandlerFunc {
15+
return func(w http.ResponseWriter, r *http.Request) {
16+
errCh := make(chan error, 1)
17+
reload <- errCh
18+
if err := <-errCh; err != nil {
19+
log.Error("msg", "failed to reload", "err", err.Error())
20+
http.Error(w, fmt.Errorf("failed to reload: %w", err).Error(), http.StatusInternalServerError)
21+
}
22+
w.Header().Set("Content-Length", "0")
23+
}
24+
}

pkg/api/router.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/timescale/promscale/pkg/telemetry"
3030
)
3131

32-
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query) (*mux.Router, error) {
32+
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query, reload chan<- chan error) (*mux.Router, error) {
3333
var writePreprocessors []parser.Preprocessor
3434
if apiConf.HighAvailability {
3535
service := ha.NewService(haClient.NewLeaseClient(client.Connection))
@@ -101,6 +101,9 @@ func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.
101101
router.Path("/healthz").Methods(http.MethodGet, http.MethodOptions, http.MethodHead).HandlerFunc(Health(healthChecker))
102102
router.Path(apiConf.TelemetryPath).Methods(http.MethodGet).HandlerFunc(promhttp.Handler().ServeHTTP)
103103

104+
reloadHandler := timeHandler(metrics.HTTPRequestDuration, "/-/reload", Reload(reload))
105+
router.Path("/-/reload").Methods(http.MethodPost).HandlerFunc(reloadHandler)
106+
104107
jaeger.ExtendQueryAPIs(router, client.Connection, query)
105108

106109
debugProf := router.PathPrefix("/debug/pprof").Subrouter()

pkg/reloader/reloader.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// This file and its contents are licensed under the Apache License 2.0.
2+
// Please see the included NOTICE for copyright information and
3+
// LICENSE for a copy of the license.
4+
5+
package reloader
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"os"
11+
"os/signal"
12+
"sync"
13+
"syscall"
14+
15+
"github.com/timescale/promscale/pkg/log"
16+
)
17+
18+
type service struct {
19+
name string
20+
payload func() error
21+
}
22+
23+
type loader struct {
24+
ctx context.Context
25+
mux sync.Mutex
26+
services []service
27+
enableWebAdmin bool
28+
// Signal from an external routine. We accept an error channel to respond
29+
// to the http request in case some error happens during reloading.
30+
sigExt <-chan chan error
31+
}
32+
33+
// New creates a new thread-safe loader that can reloads a set of services
34+
// added through the Add() function.
35+
//
36+
// Reloading can be done by:
37+
// 1. SIGHUP. This is implemented by default
38+
// 2. External call. This happens on receiving a signal from the given sig
39+
//
40+
// Services for reloading can be added by Add(). When the reloading
41+
// starts, all services added are reloaded serially to ensure that
42+
// any dependent service is reloaded first. This also means that
43+
// each reload is a sync process and waits for the current service
44+
// to complete reloading before moving to the next.
45+
//
46+
// Note:
47+
// 1. All services are loaded once after calling Run()
48+
// 2. Reloading stops if any of the services give out an error while reloading.
49+
// This is important since the following dependent services might crash on reloading
50+
// if reloading their child does not succeed
51+
// 2. Loader becomes useless after a context cancel
52+
func New(ctx context.Context, enableWebAdmin bool, sig <-chan chan error) *loader {
53+
return &loader{
54+
ctx: ctx,
55+
enableWebAdmin: enableWebAdmin,
56+
services: make([]service, 0),
57+
sigExt: sig,
58+
}
59+
}
60+
61+
// Add adds the service to the list of reloadable services.
62+
// The given update func is called whenever a request to reload
63+
// is received.
64+
func (l *loader) Add(serviceName string, update func() error) {
65+
l.mux.Lock()
66+
defer l.mux.Unlock()
67+
l.services = append(l.services, service{serviceName, update})
68+
}
69+
70+
func (l *loader) Run() {
71+
sighup := make(chan os.Signal, 1)
72+
signal.Notify(sighup, syscall.SIGHUP)
73+
74+
for {
75+
select {
76+
case <-l.ctx.Done():
77+
l.services = nil // Make services GC-able.
78+
return
79+
case <-sighup:
80+
if err := l.reload(); err != nil {
81+
log.Error("msg", "stopping reload", "reason", err.Error())
82+
}
83+
case errCh := <-l.sigExt:
84+
if !l.enableWebAdmin {
85+
err := fmt.Errorf("reload received but web admin is disabled. To enable, start Promscale with '-web.enable-admin-api' flag")
86+
log.Error("msg", err.Error())
87+
errCh <- err
88+
continue
89+
}
90+
if err := l.reload(); err != nil {
91+
log.Error("msg", "stopping reload", "reason", err.Error())
92+
errCh <- err
93+
} else {
94+
errCh <- nil
95+
}
96+
}
97+
}
98+
}
99+
100+
func (l *loader) reload() error {
101+
l.mux.Lock()
102+
defer l.mux.Unlock()
103+
104+
// Reload services serially, in the order of their addition.
105+
for _, s := range l.services {
106+
log.Warn("msg", "reloading...", "service-name", s.name)
107+
if err := s.payload(); err != nil {
108+
return fmt.Errorf("error reloading '%s'", s.name)
109+
}
110+
}
111+
return nil
112+
}

pkg/runner/runner.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/google/uuid"
2727
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
28+
"github.com/pkg/errors"
2829

2930
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
3031
"github.com/prometheus/client_golang/prometheus"
@@ -36,6 +37,7 @@ import (
3637
"github.com/timescale/promscale/pkg/pgclient"
3738
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
3839
dbMetrics "github.com/timescale/promscale/pkg/pgmodel/metrics/database"
40+
"github.com/timescale/promscale/pkg/reloader"
3941
"github.com/timescale/promscale/pkg/rules"
4042
"github.com/timescale/promscale/pkg/telemetry"
4143
"github.com/timescale/promscale/pkg/thanos"
@@ -138,8 +140,9 @@ func Run(cfg *Config) error {
138140
}
139141

140142
jaegerQuery := query.New(client.QuerierConnection, &cfg.TracingCfg)
143+
reloadSig := make(chan chan error, 1)
141144

142-
router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery)
145+
router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery, reloadSig)
143146
if err != nil {
144147
log.Error("msg", "aborting startup due to error", "err", fmt.Sprintf("generate router: %s", err.Error()))
145148
return fmt.Errorf("generate router: %w", err)
@@ -222,6 +225,10 @@ func Run(cfg *Config) error {
222225
},
223226
)
224227

228+
loadCtx, stopLoader := context.WithCancel(context.Background())
229+
defer stopLoader()
230+
loader := reloader.New(loadCtx, cfg.APICfg.AdminAPIEnabled, reloadSig)
231+
225232
pendingTelemetry := map[string]string{
226233
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
227234
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
@@ -243,15 +250,16 @@ func Run(cfg *Config) error {
243250
}
244251
cfg.APICfg.Rules = manager
245252

253+
loader.Add("rules-manager", func() error {
254+
promCfg := cfg.RulesCfg.PrometheusConfig
255+
return errors.WithMessage(manager.ApplyConfig(promCfg), "error applying Prometheus configuration to rules manager")
256+
})
257+
246258
group.Add(
247259
func() error {
248-
promCfg := cfg.RulesCfg.PrometheusConfig
249-
if err = manager.ApplyConfig(promCfg); err != nil {
250-
return fmt.Errorf("error applying Prometheus configuration to rules manager: %w", err)
251-
}
252260
log.Info("msg", "Started Rule-Manager")
253261
return manager.Run()
254-
}, func(err error) {
262+
}, func(error) {
255263
log.Info("msg", "Stopping Rule-Manager")
256264
stopRuler()
257265
},
@@ -267,6 +275,15 @@ func Run(cfg *Config) error {
267275
}
268276
}()
269277

278+
group.Add(
279+
func() error {
280+
loader.Run()
281+
return nil
282+
}, func(error) {
283+
stopLoader()
284+
},
285+
)
286+
270287
mux := http.NewServeMux()
271288
mux.Handle("/", router)
272289

0 commit comments

Comments
 (0)