Skip to content

Commit a72f7b8

Browse files
authored
Merge pull request #132 from marevers/add-query-iterator
Support for iterating over query results
2 parents e69d565 + 55b6ef7 commit a72f7b8

File tree

5 files changed

+165
-11
lines changed

5 files changed

+165
-11
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ jobs:
132132
startup_sql:
133133
- 'SET lock_timeout = 1000'
134134
- 'SET idle_in_transaction_session_timeout = 100'
135+
# iterator is an optional mechanism to iterate over a series of values, e.g. multiple databases
136+
iterator:
137+
# sql is the SQL to execute to retrieve the list of values to iterate over -
138+
# query result must be a single column
139+
sql: 'SELECT database_name FROM databases'
140+
# placeholder should be present in the original query and not also used as an environment variable
141+
# e.g. {{PLACEHOLDER}} - it will be replaced by the values retrieved by the query
142+
placeholder: PLACEHOLDER
143+
# label is the label name to which the iterator value gets assigned
144+
label: database
135145
# queries is a map of Metric/Query mappings
136146
queries:
137147
# name is prefixed with sql_ and used as the metric name

config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ type Job struct {
152152
Connections []string `yaml:"connections"`
153153
Queries []*Query `yaml:"queries"`
154154
StartupSQL []string `yaml:"startup_sql"` // SQL executed on startup
155+
Iterator Iterator `yaml:"iterator"` // Iterator configuration
155156
}
156157

157158
type connection struct {
@@ -162,6 +163,7 @@ type connection struct {
162163
database string
163164
user string
164165
tokenExpirationTime time.Time
166+
iteratorValues []string
165167
}
166168

167169
// Query is an SQL query that is executed on a connection
@@ -180,3 +182,10 @@ type Query struct {
180182
Query string `yaml:"query"` // a literal query
181183
QueryRef string `yaml:"query_ref"` // references a query in the query map
182184
}
185+
186+
// Iterator is a mechanism to repeat queries from a job based on the results of another query
187+
type Iterator struct {
188+
SQL string `yaml:"sql"` // SQL to execute to retrieve iterator values
189+
Placeholder string `yaml:"placeholder"` // Placeholder in query to be replaced
190+
Label string `yaml:"label"` // Label to assign iterator values to
191+
}

job.go

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ func (j *Job) Init(logger log.Logger, queries map[string]string) error {
8787
// try to satisfy prometheus naming restrictions
8888
name := MetricNameRE.ReplaceAllString("sql_"+q.Name, "")
8989
help := q.Help
90+
91+
// append the iterator label if it is set
92+
if j.Iterator.Label != "" {
93+
q.Labels = append(q.Labels, j.Iterator.Label)
94+
}
95+
9096
// prepare a new metrics descriptor
9197
//
9298
// the tricky part here is that the *order* of labels has to match the
@@ -428,6 +434,36 @@ func (j *Job) runOnceConnection(conn *connection, done chan int) {
428434
return
429435
}
430436

437+
// execute iterator SQL
438+
if j.Iterator.SQL != "" {
439+
level.Debug(j.log).Log("msg", "IteratorSQL", "Query:", j.Iterator.SQL)
440+
rows, err := conn.conn.Queryx(j.Iterator.SQL)
441+
if err != nil {
442+
level.Warn(j.log).Log("msg", "Failed to run iterator query", "err", err, "host", conn.host)
443+
j.markFailed(conn)
444+
// we don't have the query name yet.
445+
failedQueryCounter.WithLabelValues(j.Name, "").Inc()
446+
return
447+
}
448+
449+
defer rows.Close()
450+
451+
var ivs []string
452+
for rows.Next() {
453+
var value string
454+
err := rows.Scan(&value)
455+
if err != nil {
456+
level.Warn(j.log).Log("msg", "Failed to read iterator values", "err", err, "host", conn.host)
457+
j.markFailed(conn)
458+
// we don't have the query name yet.
459+
failedQueryCounter.WithLabelValues(j.Name, "").Inc()
460+
return
461+
}
462+
ivs = append(ivs, value)
463+
}
464+
conn.iteratorValues = ivs
465+
}
466+
431467
for _, q := range j.Queries {
432468
if q == nil {
433469
continue
@@ -437,11 +473,21 @@ func (j *Job) runOnceConnection(conn *connection, done chan int) {
437473
level.Warn(q.log).Log("msg", "Skipping query. Collector is nil")
438474
continue
439475
}
440-
level.Debug(q.log).Log("msg", "Running Query")
441-
// execute the query on the connection
442-
if err := q.Run(conn); err != nil {
443-
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
444-
continue
476+
// repeat query with iterator values if set and the query has the iterator placeholder
477+
if conn.iteratorValues != nil && q.HasIterator(j.Iterator.Placeholder) {
478+
level.Debug(q.log).Log("msg", "Running Iterator Query")
479+
// execute the query with iterator on the connection
480+
if err := q.RunIterator(conn, j.Iterator.Placeholder, conn.iteratorValues, j.Iterator.Label); err != nil {
481+
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
482+
continue
483+
}
484+
} else {
485+
level.Debug(q.log).Log("msg", "Running Query")
486+
// execute the query on the connection
487+
if err := q.Run(conn); err != nil {
488+
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
489+
continue
490+
}
445491
}
446492
level.Debug(q.log).Log("msg", "Query finished")
447493
updated++

main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import (
1010
"github.com/go-kit/log"
1111
"github.com/go-kit/log/level"
1212
"github.com/prometheus/client_golang/prometheus"
13-
"github.com/prometheus/client_golang/prometheus/promhttp"
14-
"github.com/prometheus/common/version"
1513
prom_collectors_version "github.com/prometheus/client_golang/prometheus/collectors/version"
14+
"github.com/prometheus/client_golang/prometheus/promhttp"
15+
"github.com/prometheus/common/version"
1616
_ "go.uber.org/automaxprocs"
1717
)
1818

query.go

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"fmt"
55
"strconv"
6+
"strings"
67
"time"
78

89
"github.com/go-kit/log"
@@ -50,7 +51,7 @@ func (q *Query) Run(conn *connection) error {
5051
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
5152
continue
5253
}
53-
m, err := q.updateMetrics(conn, res)
54+
m, err := q.updateMetrics(conn, res, "", "")
5455
if err != nil {
5556
level.Error(q.log).Log("msg", "Failed to update metrics", "err", err, "host", conn.host, "db", conn.database)
5657
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
@@ -77,8 +78,90 @@ func (q *Query) Run(conn *connection) error {
7778
return nil
7879
}
7980

81+
// RunIterator runs the query for each iterator value on a single connection
82+
func (q *Query) RunIterator(conn *connection, ph string, ivs []string, il string) error {
83+
if q.log == nil {
84+
q.log = log.NewNopLogger()
85+
}
86+
queryCounter.WithLabelValues(q.jobName, q.Name).Inc()
87+
if q.desc == nil {
88+
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
89+
return fmt.Errorf("metrics descriptor is nil")
90+
}
91+
if q.Query == "" {
92+
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
93+
return fmt.Errorf("query is empty")
94+
}
95+
if conn == nil || conn.conn == nil {
96+
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
97+
return fmt.Errorf("db connection not initialized (should not happen)")
98+
}
99+
100+
// execute query for each iterator value
101+
now := time.Now()
102+
metrics := make([]prometheus.Metric, 0, len(q.metrics))
103+
updated := 0
104+
for _, iv := range ivs {
105+
rows, err := conn.conn.Queryx(q.ReplaceIterator(ph, iv))
106+
if err != nil {
107+
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
108+
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
109+
return err
110+
}
111+
defer rows.Close()
112+
113+
for rows.Next() {
114+
res := make(map[string]interface{})
115+
err := rows.MapScan(res)
116+
if err != nil {
117+
level.Error(q.log).Log("msg", "Failed to scan", "err", err, "host", conn.host, "db", conn.database)
118+
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
119+
continue
120+
}
121+
m, err := q.updateMetrics(conn, res, iv, il)
122+
if err != nil {
123+
level.Error(q.log).Log("msg", "Failed to update metrics", "err", err, "host", conn.host, "db", conn.database)
124+
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
125+
continue
126+
}
127+
metrics = append(metrics, m...)
128+
updated++
129+
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(0.0)
130+
}
131+
}
132+
133+
duration := time.Since(now)
134+
queryDurationHistogram.WithLabelValues(q.jobName, q.Name).Observe(duration.Seconds())
135+
136+
if updated < 1 {
137+
if q.AllowZeroRows {
138+
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(0.0)
139+
} else {
140+
return fmt.Errorf("zero rows returned")
141+
}
142+
}
143+
144+
// update the metrics cache
145+
q.Lock()
146+
q.metrics[conn] = metrics
147+
q.Unlock()
148+
149+
return nil
150+
}
151+
152+
// HasIterator returns true if the query contains the given placeholder
153+
func (q *Query) HasIterator(ph string) bool {
154+
return strings.Contains(q.Query, ph)
155+
}
156+
157+
// ReplaceIterator replaces a given placeholder with an iterator value and returns a new query
158+
func (q *Query) ReplaceIterator(ph string, iv string) string {
159+
iteratorReplacer := strings.NewReplacer(fmt.Sprint("{{", ph, "}}"), iv)
160+
return iteratorReplacer.Replace(q.Query)
161+
}
162+
80163
// updateMetrics parses the result set and returns a slice of const metrics
81-
func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]prometheus.Metric, error) {
164+
func (q *Query) updateMetrics(conn *connection, res map[string]interface{}, iv string, il string) ([]prometheus.Metric, error) {
82165
// if no value were defined to be parsed, return immediately
83166
if len(q.Values) == 0 {
84167
level.Debug(q.log).Log("msg", "No values defined in configuration, skipping metric update")
@@ -87,7 +170,7 @@ func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]p
87170
updated := 0
88171
metrics := make([]prometheus.Metric, 0, len(q.Values))
89172
for _, valueName := range q.Values {
90-
m, err := q.updateMetric(conn, res, valueName)
173+
m, err := q.updateMetric(conn, res, valueName, iv, il)
91174
if err != nil {
92175
level.Error(q.log).Log(
93176
"msg", "Failed to update metric",
@@ -108,7 +191,7 @@ func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]p
108191
}
109192

110193
// updateMetrics parses a single row and returns a const metric
111-
func (q *Query) updateMetric(conn *connection, res map[string]interface{}, valueName string) (prometheus.Metric, error) {
194+
func (q *Query) updateMetric(conn *connection, res map[string]interface{}, valueName string, iv string, il string) (prometheus.Metric, error) {
112195
var value float64
113196
if i, ok := res[valueName]; ok {
114197
switch f := i.(type) {
@@ -154,6 +237,12 @@ func (q *Query) updateMetric(conn *connection, res map[string]interface{}, value
154237
// added below
155238
labels := make([]string, 0, len(q.Labels)+5)
156239
for _, label := range q.Labels {
240+
// append iterator value to the labels
241+
if label == il && iv != "" {
242+
labels = append(labels, iv)
243+
continue
244+
}
245+
157246
// we need to fill every spot in the slice or the key->value mapping
158247
// won't match up in the end.
159248
//

0 commit comments

Comments
 (0)