Skip to content

Commit 02eff41

Browse files
Add a rate limit on logger (#352)
* Add a rate limit on logger Signed-off-by: Yuri Nikolic <[email protected]> * Fixing review findings Signed-off-by: Yuri Nikolic <[email protected]> * Fixing review findings 2 Signed-off-by: Yuri Nikolic <[email protected]> --------- Signed-off-by: Yuri Nikolic <[email protected]>
1 parent e772133 commit 02eff41

File tree

3 files changed

+322
-0
lines changed

3 files changed

+322
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
* [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279
5454
* [FEATURE] Add `log.BufferedLogger` type. #338
5555
* [FEATURE] Add `flagext.ParseFlagsAndArguments()` and `flagext.ParseFlagsWithoutArguments()` utilities. #341
56+
* [FEATURE] Add `log.RateLimitedLogger` for limiting the rate of logging. The `logger_rate_limit_discarded_log_lines_total` metrics traces the total number of discarded log lines per level. #352
5657
* [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients.
5758
* [ENHANCEMENT] Use `SecretReader` interface to fetch secrets when configuring TLS. #274
5859
* [ENHANCEMENT] Add middleware package. #38

log/ratelimit.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package log
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
6+
"golang.org/x/time/rate"
7+
)
8+
9+
const (
10+
infoLevel = "info"
11+
debugLevel = "debug"
12+
warnLevel = "warning"
13+
errorLevel = "error"
14+
)
15+
16+
type RateLimitedLogger struct {
17+
next Interface
18+
limiter *rate.Limiter
19+
20+
discardedInfoLogLinesCounter prometheus.Counter
21+
discardedDebugLogLinesCounter prometheus.Counter
22+
discardedWarnLogLinesCounter prometheus.Counter
23+
discardedErrorLogLinesCounter prometheus.Counter
24+
}
25+
26+
// NewRateLimitedLogger returns a logger.Interface that is limited to the given number of logs per second,
27+
// with the given burst size.
28+
func NewRateLimitedLogger(logger Interface, logsPerSecond rate.Limit, burstSize int, reg prometheus.Registerer) Interface {
29+
discardedLogLinesCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
30+
Name: "logger_rate_limit_discarded_log_lines_total",
31+
Help: "Total number of discarded log lines per level.",
32+
}, []string{"level"})
33+
34+
return &RateLimitedLogger{
35+
next: logger,
36+
limiter: rate.NewLimiter(logsPerSecond, burstSize),
37+
discardedInfoLogLinesCounter: discardedLogLinesCounter.WithLabelValues(infoLevel),
38+
discardedDebugLogLinesCounter: discardedLogLinesCounter.WithLabelValues(debugLevel),
39+
discardedWarnLogLinesCounter: discardedLogLinesCounter.WithLabelValues(warnLevel),
40+
discardedErrorLogLinesCounter: discardedLogLinesCounter.WithLabelValues(errorLevel),
41+
}
42+
}
43+
44+
func (l *RateLimitedLogger) Debugf(format string, args ...interface{}) {
45+
if l.limiter.Allow() {
46+
l.next.Debugf(format, args...)
47+
} else {
48+
l.discardedDebugLogLinesCounter.Inc()
49+
}
50+
}
51+
52+
func (l *RateLimitedLogger) Debugln(args ...interface{}) {
53+
if l.limiter.Allow() {
54+
l.next.Debugln(args...)
55+
} else {
56+
l.discardedDebugLogLinesCounter.Inc()
57+
}
58+
}
59+
60+
func (l *RateLimitedLogger) Infof(format string, args ...interface{}) {
61+
if l.limiter.Allow() {
62+
l.next.Infof(format, args...)
63+
} else {
64+
l.discardedInfoLogLinesCounter.Inc()
65+
}
66+
}
67+
68+
func (l *RateLimitedLogger) Infoln(args ...interface{}) {
69+
if l.limiter.Allow() {
70+
l.next.Infoln(args...)
71+
} else {
72+
l.discardedInfoLogLinesCounter.Inc()
73+
}
74+
}
75+
76+
func (l *RateLimitedLogger) Errorf(format string, args ...interface{}) {
77+
if l.limiter.Allow() {
78+
l.next.Errorf(format, args...)
79+
} else {
80+
l.discardedErrorLogLinesCounter.Inc()
81+
}
82+
}
83+
84+
func (l *RateLimitedLogger) Errorln(args ...interface{}) {
85+
if l.limiter.Allow() {
86+
l.next.Errorln(args...)
87+
} else {
88+
l.discardedErrorLogLinesCounter.Inc()
89+
}
90+
}
91+
92+
func (l *RateLimitedLogger) Warnf(format string, args ...interface{}) {
93+
if l.limiter.Allow() {
94+
l.next.Warnf(format, args...)
95+
} else {
96+
l.discardedWarnLogLinesCounter.Inc()
97+
}
98+
}
99+
100+
func (l *RateLimitedLogger) Warnln(args ...interface{}) {
101+
if l.limiter.Allow() {
102+
l.next.Warnln(args...)
103+
} else {
104+
l.discardedWarnLogLinesCounter.Inc()
105+
}
106+
}
107+
108+
func (l *RateLimitedLogger) WithField(key string, value interface{}) Interface {
109+
return &RateLimitedLogger{
110+
next: l.next.WithField(key, value),
111+
limiter: l.limiter,
112+
discardedInfoLogLinesCounter: l.discardedInfoLogLinesCounter,
113+
discardedDebugLogLinesCounter: l.discardedDebugLogLinesCounter,
114+
discardedWarnLogLinesCounter: l.discardedWarnLogLinesCounter,
115+
discardedErrorLogLinesCounter: l.discardedErrorLogLinesCounter,
116+
}
117+
}
118+
119+
func (l *RateLimitedLogger) WithFields(f Fields) Interface {
120+
return &RateLimitedLogger{
121+
next: l.next.WithFields(f),
122+
limiter: l.limiter,
123+
discardedInfoLogLinesCounter: l.discardedInfoLogLinesCounter,
124+
discardedDebugLogLinesCounter: l.discardedDebugLogLinesCounter,
125+
discardedWarnLogLinesCounter: l.discardedWarnLogLinesCounter,
126+
discardedErrorLogLinesCounter: l.discardedErrorLogLinesCounter,
127+
}
128+
}

log/ratelimit_test.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package log
2+
3+
import (
4+
"bytes"
5+
"strings"
6+
"testing"
7+
"time"
8+
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/testutil"
11+
"github.com/sirupsen/logrus"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestRateLimitedLoggerLogs(t *testing.T) {
17+
buf := bytes.NewBuffer(nil)
18+
c := newCounterLogger(buf)
19+
reg := prometheus.NewPedanticRegistry()
20+
r := NewRateLimitedLogger(c, 1, 1, reg)
21+
22+
r.Errorln("Error will be logged")
23+
assert.Equal(t, 1, c.count)
24+
25+
logContains := []string{"error", "Error will be logged"}
26+
c.assertContains(t, logContains)
27+
}
28+
29+
func TestRateLimitedLoggerLimits(t *testing.T) {
30+
buf := bytes.NewBuffer(nil)
31+
c := newCounterLogger(buf)
32+
reg := prometheus.NewPedanticRegistry()
33+
r := NewRateLimitedLogger(c, 2, 2, reg)
34+
35+
r.Errorln("error 1 will be logged")
36+
assert.Equal(t, 1, c.count)
37+
c.assertContains(t, []string{"error", "error 1 will be logged"})
38+
39+
r.Infoln("info 1 will be logged")
40+
assert.Equal(t, 2, c.count)
41+
c.assertContains(t, []string{"info", "info 1 will be logged"})
42+
43+
r.Debugln("debug 1 will be discarded")
44+
assert.Equal(t, 2, c.count)
45+
c.assertNotContains(t, "debug 1 will be discarded")
46+
47+
r.Warnln("warning 1 will be discarded")
48+
assert.Equal(t, 2, c.count)
49+
c.assertNotContains(t, "warning 1 will be discarded")
50+
51+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
52+
# HELP logger_rate_limit_discarded_log_lines_total Total number of discarded log lines per level.
53+
# TYPE logger_rate_limit_discarded_log_lines_total counter
54+
logger_rate_limit_discarded_log_lines_total{level="info"} 0
55+
logger_rate_limit_discarded_log_lines_total{level="debug"} 1
56+
logger_rate_limit_discarded_log_lines_total{level="warning"} 1
57+
logger_rate_limit_discarded_log_lines_total{level="error"} 0
58+
`)))
59+
60+
// we wait 1 second, so the next group of lines can be logged
61+
time.Sleep(time.Second)
62+
r.Debugln("debug 2 will be logged")
63+
assert.Equal(t, 3, c.count)
64+
c.assertContains(t, []string{"debug", "debug 2 will be logged"})
65+
66+
r.Infoln("info 2 will be logged")
67+
assert.Equal(t, 4, c.count)
68+
c.assertContains(t, []string{"info", "info 2 will be logged"})
69+
70+
r.Errorln("error 2 will be discarded")
71+
assert.Equal(t, 4, c.count)
72+
c.assertNotContains(t, "error 2 will be discarded")
73+
74+
r.Warnln("warning 2 will be discarded")
75+
assert.Equal(t, 4, c.count)
76+
c.assertNotContains(t, "warning 2 will be discarded")
77+
78+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
79+
# HELP logger_rate_limit_discarded_log_lines_total Total number of discarded log lines per level.
80+
# TYPE logger_rate_limit_discarded_log_lines_total counter
81+
logger_rate_limit_discarded_log_lines_total{level="info"} 0
82+
logger_rate_limit_discarded_log_lines_total{level="debug"} 1
83+
logger_rate_limit_discarded_log_lines_total{level="error"} 1
84+
logger_rate_limit_discarded_log_lines_total{level="warning"} 2
85+
`)))
86+
}
87+
88+
func TestRateLimitedLoggerWithFields(t *testing.T) {
89+
buf := bytes.NewBuffer(nil)
90+
c := newCounterLogger(buf)
91+
reg := prometheus.NewPedanticRegistry()
92+
logger := NewRateLimitedLogger(c, 0.0001, 1, reg)
93+
loggerWithFields := logger.WithField("key", "value")
94+
95+
loggerWithFields.Errorln("Error will be logged")
96+
assert.Equal(t, 1, c.count)
97+
c.assertContains(t, []string{"key", "value", "error", "Error will be logged"})
98+
99+
logger.Infoln("Info will not be logged")
100+
c.assertNotContains(t, "Info will not be logged")
101+
102+
loggerWithFields.Debugln("Debug will not be logged")
103+
c.assertNotContains(t, "Debug will not be logged")
104+
105+
loggerWithFields.Warnln("Warning will not be logged")
106+
c.assertNotContains(t, "Warning will not be logged")
107+
assert.Equal(t, 1, c.count)
108+
109+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
110+
# HELP logger_rate_limit_discarded_log_lines_total Total number of discarded log lines per level.
111+
# TYPE logger_rate_limit_discarded_log_lines_total counter
112+
logger_rate_limit_discarded_log_lines_total{level="info"} 1
113+
logger_rate_limit_discarded_log_lines_total{level="debug"} 1
114+
logger_rate_limit_discarded_log_lines_total{level="warning"} 1
115+
logger_rate_limit_discarded_log_lines_total{level="error"} 0
116+
`)))
117+
}
118+
119+
type counterLogger struct {
120+
logger Interface
121+
buf *bytes.Buffer
122+
count int
123+
}
124+
125+
func (c *counterLogger) Debugf(format string, args ...interface{}) {
126+
c.logger.Debugf(format, args...)
127+
c.count++
128+
}
129+
130+
func (c *counterLogger) Debugln(args ...interface{}) {
131+
c.logger.Debugln(args...)
132+
c.count++
133+
}
134+
135+
func (c *counterLogger) Infof(format string, args ...interface{}) {
136+
c.logger.Infof(format, args...)
137+
c.count++
138+
}
139+
140+
func (c *counterLogger) Infoln(args ...interface{}) {
141+
c.logger.Infoln(args...)
142+
c.count++
143+
}
144+
145+
func (c *counterLogger) Warnf(format string, args ...interface{}) {
146+
c.logger.Warnf(format, args...)
147+
c.count++
148+
}
149+
150+
func (c *counterLogger) Warnln(args ...interface{}) {
151+
c.logger.Warnln(args...)
152+
c.count++
153+
}
154+
155+
func (c *counterLogger) Errorf(format string, args ...interface{}) {
156+
c.logger.Errorf(format, args...)
157+
c.count++
158+
}
159+
160+
func (c *counterLogger) Errorln(args ...interface{}) {
161+
c.logger.Errorln(args...)
162+
c.count++
163+
}
164+
165+
func (c *counterLogger) WithField(key string, value interface{}) Interface {
166+
c.logger = c.logger.WithField(key, value)
167+
return c
168+
}
169+
170+
func (c *counterLogger) WithFields(fields Fields) Interface {
171+
c.logger = c.logger.WithFields(fields)
172+
return c
173+
}
174+
175+
func (c *counterLogger) assertContains(t *testing.T, logContains []string) {
176+
for _, content := range logContains {
177+
require.True(t, bytes.Contains(c.buf.Bytes(), []byte(content)))
178+
}
179+
}
180+
181+
func (c *counterLogger) assertNotContains(t *testing.T, content string) {
182+
require.False(t, bytes.Contains(c.buf.Bytes(), []byte(content)))
183+
}
184+
185+
func newCounterLogger(buf *bytes.Buffer) *counterLogger {
186+
logrusLogger := logrus.New()
187+
logrusLogger.Out = buf
188+
logrusLogger.Level = logrus.DebugLevel
189+
return &counterLogger{
190+
logger: Logrus(logrusLogger),
191+
buf: buf,
192+
}
193+
}

0 commit comments

Comments
 (0)