Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use the following categories for changes:
- Telemetry for recording rules and alerting [#1424]
- Set number of ingest copiers to the number of DB CPUs [#1387]
- Telemetry for helm chart installations [#1429]
- Ability to reload rules and alerting config [#1426]

### Fixed
- Trace query returns empty result when queried with
Expand Down
29 changes: 29 additions & 0 deletions pkg/api/reload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package api

import (
"fmt"
"net/http"

"github.com/timescale/promscale/pkg/log"
)

func Reload(reload func() error, webAdmin bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !webAdmin {
err := fmt.Errorf("reload received but web admin is disabled. To enable, start Promscale with '-web.enable-admin-api' flag")
log.Error("msg", err.Error())
http.Error(w, fmt.Errorf("failed to reload: %w", err).Error(), http.StatusUnauthorized)
return
}
if err := reload(); err != nil {
log.Error("msg", "failed to reload", "err", err.Error())
http.Error(w, fmt.Errorf("failed to reload: %w", err).Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Length", "0")
}
}
5 changes: 4 additions & 1 deletion pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/timescale/promscale/pkg/telemetry"
)

func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query) (*mux.Router, error) {
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query, reload func() error) (*mux.Router, error) {
var writePreprocessors []parser.Preprocessor
if apiConf.HighAvailability {
service := ha.NewService(haClient.NewLeaseClient(client.Connection))
Expand Down Expand Up @@ -101,6 +101,9 @@ func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.
router.Path("/healthz").Methods(http.MethodGet, http.MethodOptions, http.MethodHead).HandlerFunc(Health(healthChecker))
router.Path(apiConf.TelemetryPath).Methods(http.MethodGet).HandlerFunc(promhttp.Handler().ServeHTTP)

reloadHandler := timeHandler(metrics.HTTPRequestDuration, "/-/reload", Reload(reload, apiConf.AdminAPIEnabled))
router.Path("/-/reload").Methods(http.MethodPost).HandlerFunc(reloadHandler)

jaeger.ExtendQueryAPIs(router, client.Connection, query)

debugProf := router.PathPrefix("/debug/pprof").Subrouter()
Expand Down
20 changes: 16 additions & 4 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Manager struct {
postRulesProcessing prom_rules.RuleGroupPostProcessFunc
}

func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config) (*Manager, error) {
func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config) (*Manager, func() error, error) {
discoveryManagerNotify := discovery.NewManager(ctx, log.GetLogger(), discovery.Name("notify"))

notifierManager := notifier.NewManager(&notifier.Options{
Expand All @@ -44,7 +44,7 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
// For the moment, we do not have any external UI url, hence we provide an empty one.
parsedUrl, err := url.Parse("")
if err != nil {
return nil, fmt.Errorf("parsing UI-URL: %w", err)
return nil, nil, fmt.Errorf("parsing UI-URL: %w", err)
}

rulesManager := prom_rules.NewManager(&prom_rules.ManagerOptions{
Expand All @@ -60,12 +60,24 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
ForGracePeriod: cfg.ForGracePeriod,
ResendDelay: cfg.ResendDelay,
})
return &Manager{

manager := &Manager{
ctx: ctx,
rulesManager: rulesManager,
notifierManager: notifierManager,
discoveryManager: discoveryManagerNotify,
}, nil
}
return manager, manager.getReloader(cfg), nil
}

func (m *Manager) getReloader(cfg *Config) func() error {
return func() error {
err := Validate(cfg) // This refreshes the RulesCfg.PrometheusConfig entry in RulesCfg after reading the PrometheusConfigAddress.
if err != nil {
return fmt.Errorf("error validating rules-config: %w", err)
}
return errors.WithMessage(m.ApplyConfig(cfg.PrometheusConfig), "error applying config")
}
}

func (m *Manager) WithPostRulesProcess(f prom_rules.RuleGroupPostProcessFunc) {
Expand Down
102 changes: 58 additions & 44 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"os"
"os/signal"
"syscall"
"time"

_ "github.com/jackc/pgx/v4/stdlib"
Expand All @@ -25,7 +26,6 @@ import (

"github.com/google/uuid"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"

"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -137,9 +137,52 @@ func Run(cfg *Config) error {
}
}

var (
group run.Group
reloadRules func() error
)
pendingTelemetry := map[string]string{
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
}
if cfg.RulesCfg.ContainsRules() {
pendingTelemetry["rules_enabled"] = "true"
if cfg.RulesCfg.ContainsAlertingConfig() {
pendingTelemetry["alerting_enabled"] = "true"
} else {
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
}

rulesCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

var manager *rules.Manager
manager, reloadRules, err = rules.NewManager(rulesCtx, prometheus.DefaultRegisterer, client, &cfg.RulesCfg)
if err != nil {
return fmt.Errorf("error creating rules manager: %w", err)
}
cfg.APICfg.Rules = manager

if err := reloadRules(); err != nil {
return err
}

group.Add(
func() error {
log.Info("msg", "Started Rule-Manager")
return manager.Run()
}, func(error) {
log.Info("msg", "Stopping Rule-Manager")
stopRuler()
},
)
} else {
log.Debug("msg", "Rules files not found in the given Prometheus configuration file. Both rule-manager and alerting will not be initialized")
}

jaegerQuery := query.New(client.QuerierConnection, &cfg.TracingCfg)

router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery)
router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery, reloadRules)
if err != nil {
log.Error("msg", "aborting startup due to error", "err", fmt.Sprintf("generate router: %s", err.Error()))
return fmt.Errorf("generate router: %w", err)
Expand All @@ -149,7 +192,6 @@ func Run(cfg *Config) error {
telemetryEngine.Start()
defer telemetryEngine.Stop()

var group run.Group
if len(cfg.ThanosStoreAPIListenAddr) > 0 {
srv := thanos.NewStorage(client.Queryable())
options := make([]grpc.ServerOption, 0)
Expand Down Expand Up @@ -222,44 +264,6 @@ func Run(cfg *Config) error {
},
)

pendingTelemetry := map[string]string{
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
}
if cfg.RulesCfg.ContainsRules() {
pendingTelemetry["rules_enabled"] = "true"
if cfg.RulesCfg.ContainsAlertingConfig() {
pendingTelemetry["alerting_enabled"] = "true"
} else {
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
}

rulesCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

manager, err := rules.NewManager(rulesCtx, prometheus.DefaultRegisterer, client, &cfg.RulesCfg)
if err != nil {
return fmt.Errorf("error creating rules manager: %w", err)
}
cfg.APICfg.Rules = manager

group.Add(
func() error {
promCfg := cfg.RulesCfg.PrometheusConfig
if err = manager.ApplyConfig(promCfg); err != nil {
return fmt.Errorf("error applying Prometheus configuration to rules manager: %w", err)
}
log.Info("msg", "Started Rule-Manager")
return manager.Run()
}, func(err error) {
log.Info("msg", "Stopping Rule-Manager")
stopRuler()
},
)
} else {
log.Debug("msg", "Rules files not found in the given Prometheus configuration file. Both rule-manager and alerting will not be initialized")
}

// Asynchronously update the telemetry information.
go func() {
for k, v := range pendingTelemetry {
Expand Down Expand Up @@ -296,11 +300,21 @@ func Run(cfg *Config) error {

// Listen to OS interrupt signals.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGINT, syscall.SIGHUP)
group.Add(
func() error {
<-c
return nil
for {
switch <-c {
case syscall.SIGINT:
return nil
case syscall.SIGHUP:
if err := reloadRules(); err != nil {
log.Error("msg", "error reloading rules", "err", err.Error())
continue
}
log.Debug("msg", "success reloading rules")
}
}
}, func(err error) {
close(c)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config) (*mux.Router,

jaegerQuery := jaegerquery.New(pgClient.QuerierConnection, &jaegerquery.DefaultConfig)

router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery)
router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery, nil)
if err != nil {
return nil, nil, fmt.Errorf("generate router: %w", err)
}
Expand Down
27 changes: 20 additions & 7 deletions pkg/tests/end_to_end_tests/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"github.com/timescale/promscale/pkg/tenancy"
)

const RecordingRulesEvalConfigPath = "../testdata/rules/config.recording_rules_eval.yaml"
const (
RecordingRulesEvalConfigPath = "../testdata/rules/config.recording_rules_eval.yaml"
EmptyRecordingRulesConfigPath = "../testdata/rules/config.empty_rules.yaml"
)

func TestRecordingRulesEval(t *testing.T) {
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
Expand Down Expand Up @@ -56,22 +59,27 @@ func TestRecordingRulesEval(t *testing.T) {
OutageTolerance: rules.DefaultOutageTolerance,
ForGracePeriod: rules.DefaultForGracePeriod,
ResendDelay: rules.DefaultResendDelay,
PrometheusConfigAddress: RecordingRulesEvalConfigPath,
PrometheusConfigAddress: EmptyRecordingRulesConfigPath, // Start with empty rules.
}
require.NoError(t, rules.Validate(rulesCfg))
require.True(t, rulesCfg.ContainsRules())
require.False(t, rulesCfg.ContainsRules())

ruleCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

manager, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
manager, reloadRules, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
require.NoError(t, err)

require.NotNil(t, rulesCfg.PrometheusConfig)
require.NoError(t, reloadRules())
require.False(t, rulesCfg.ContainsRules())

ruleGroups := manager.RuleGroups()
require.Equal(t, 0, len(ruleGroups))

manager.WithPostRulesProcess(func(*prom_rules.Group, time.Time, log.Logger) error {
defer func() {
stopRuler() // Cancels the context so that the blocking manager.Run() is released when the test finishes.
stopRuler() // Shuts down the manager.Run() as soon as the test completes.
}()
// Check if recording rule as a metric exists in metric catalog table.
var exists bool
Expand All @@ -88,8 +96,13 @@ func TestRecordingRulesEval(t *testing.T) {

return nil
})
require.NoError(t, manager.ApplyConfig(rulesCfg.PrometheusConfig))
require.NoError(t, manager.Run(), "error running rules manager") // This is blocking. It will be released after stopRuler() in defer func.
// Reload with configuration file that contains some rules.
rulesCfg.PrometheusConfigAddress = RecordingRulesEvalConfigPath
require.NoError(t, reloadRules())
require.True(t, rulesCfg.ContainsRules())
require.Equal(t, 1, len(manager.RuleGroups()))

require.NoError(t, manager.Run(), "error running rules manager")
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/tests/testdata/rules/config.empty_rules.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Empty file indicates recording rules are not applied.