Skip to content

Commit db6a1b8

Browse files
committed
Bring reduced batch size to all other standard maintenance services
This one continues #1013 by bringing reduced batch size circuit breakers to all other client maintenance services (previously it was only in `JobScheduler`). We extract some common constants and a couple helpers into `riversharedmaintenance`, then invoke that from each service. `riversharedmaintenance` is used so that the logic can potentially be shared outside the main River project as well. `circuitbreaker` has been moved from an internal package to `rivershared` for the same reason.
1 parent de60166 commit db6a1b8

File tree

13 files changed

+441
-87
lines changed

13 files changed

+441
-87
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3333
- The unused function `WorkerDefaults.Hooks` has been removed. This is technically a breaking change, but this function was a vestigal refactoring artifact that was never used by anything, so in practice it shouldn't be breaking. [PR #997](https://github.com/riverqueue/river/pull/997).
3434
- Periodic job records are upserted immediately through a pilot when a client is started rather than the first time their associated job would run. This doesn't mean they're run immediately (they'll only run if `RunOnStart` is enabled), but rather just tracked immediately. [PR #998](https://github.com/riverqueue/river/pull/998).
3535
- The job scheduler still schedules jobs in batches of up to 10,000, but when it encounters a series of consecutive timeouts it assumes that the database is in a degraded state and switches to doing work in a smaller batch size of 1,000 jobs. [PR #1013](https://github.com/riverqueue/river/pull/1013).
36+
- Other maintenance services including the job cleaner, job rescuer, and queue cleaner also prefer a batch size of 10,000, but will fall back to smaller batches of 1,000 on consecutive database timeouts. [PR #1016](https://github.com/riverqueue/river/pull/1016).
3637

3738
### Fixed
3839

internal/maintenance/job_cleaner.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/riverqueue/river/riverdriver"
1212
"github.com/riverqueue/river/rivershared/baseservice"
13+
"github.com/riverqueue/river/rivershared/circuitbreaker"
1314
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
1415
"github.com/riverqueue/river/rivershared/startstop"
1516
"github.com/riverqueue/river/rivershared/testsignal"
@@ -29,6 +30,8 @@ func (ts *JobCleanerTestSignals) Init(tb testutil.TestingTB) {
2930
}
3031

3132
type JobCleanerConfig struct {
33+
riversharedmaintenance.BatchSizes
34+
3235
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
3336
// around before they're removed permanently.
3437
//
@@ -62,6 +65,8 @@ type JobCleanerConfig struct {
6265
}
6366

6467
func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
68+
c.MustValidate()
69+
6570
if c.CancelledJobRetentionPeriod < -1 {
6671
panic("JobCleanerConfig.CancelledJobRetentionPeriod must be above zero")
6772
}
@@ -91,13 +96,23 @@ type JobCleaner struct {
9196
Config *JobCleanerConfig
9297
TestSignals JobCleanerTestSignals
9398

94-
batchSize int // configurable for test purposes
95-
exec riverdriver.Executor
99+
exec riverdriver.Executor
100+
101+
// Circuit breaker that tracks consecutive timeout failures from the central
102+
// query. The query starts by using the full/default batch size, but after
103+
// this breaker trips (after N consecutive timeouts occur in a row), it
104+
// switches to a smaller batch. We assume that a database that's degraded is
105+
// likely to stay degraded over a longer term, so after the circuit breaks,
106+
// it stays broken until the program is restarted.
107+
reducedBatchSizeBreaker *circuitbreaker.CircuitBreaker
96108
}
97109

98110
func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner {
111+
batchSizes := config.WithDefaults()
112+
99113
return baseservice.Init(archetype, &JobCleaner{
100114
Config: (&JobCleanerConfig{
115+
BatchSizes: batchSizes,
101116
CancelledJobRetentionPeriod: cmp.Or(config.CancelledJobRetentionPeriod, riversharedmaintenance.CancelledJobRetentionPeriodDefault),
102117
CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault),
103118
DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault),
@@ -106,9 +121,8 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e
106121
Schema: config.Schema,
107122
Timeout: cmp.Or(config.Timeout, riversharedmaintenance.JobCleanerTimeoutDefault),
108123
}).mustValidate(),
109-
110-
batchSize: riversharedmaintenance.BatchSizeDefault,
111-
exec: exec,
124+
exec: exec,
125+
reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes),
112126
})
113127
}
114128

@@ -154,6 +168,13 @@ func (s *JobCleaner) Start(ctx context.Context) error { //nolint:dupl
154168
return nil
155169
}
156170

171+
func (s *JobCleaner) batchSize() int {
172+
if s.reducedBatchSizeBreaker.Open() {
173+
return s.Config.Reduced
174+
}
175+
return s.Config.Default
176+
}
177+
157178
type jobCleanerRunOnceResult struct {
158179
NumJobsDeleted int
159180
}
@@ -182,25 +203,31 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
182203
CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod),
183204
DiscardedDoDelete: s.Config.DiscardedJobRetentionPeriod != -1,
184205
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
185-
Max: s.batchSize,
206+
Max: s.batchSize(),
186207
QueuesExcluded: s.Config.QueuesExcluded,
187208
Schema: s.Config.Schema,
188209
})
189210
if err != nil {
190211
return 0, fmt.Errorf("error cleaning jobs: %w", err)
191212
}
192213

214+
s.reducedBatchSizeBreaker.ResetIfNotOpen()
215+
193216
return numDeleted, nil
194217
}()
195218
if err != nil {
219+
if errors.Is(err, context.DeadlineExceeded) {
220+
s.reducedBatchSizeBreaker.Trip()
221+
}
222+
196223
return nil, err
197224
}
198225

199226
s.TestSignals.DeletedBatch.Signal(struct{}{})
200227

201228
res.NumJobsDeleted += numDeleted
202229
// Deleted was less than query `LIMIT` which means work is done.
203-
if numDeleted < s.batchSize {
230+
if numDeleted < s.batchSize() {
204231
break
205232
}
206233

internal/maintenance/job_cleaner_test.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,11 @@ func TestJobCleaner(t *testing.T) {
232232
t.Parallel()
233233

234234
cleaner, bundle := setup(t)
235-
cleaner.batchSize = 10 // reduced size for test speed
235+
cleaner.Config.Default = 10 // reduced size for test speed
236236

237237
// Add one to our chosen batch size to get one extra job and therefore
238238
// one extra batch, ensuring that we've tested working multiple.
239-
numJobs := cleaner.batchSize + 1
239+
numJobs := cleaner.Config.Default + 1
240240

241241
jobs := make([]*rivertype.JobRow, numJobs)
242242

@@ -365,4 +365,76 @@ func TestJobCleaner(t *testing.T) {
365365
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
366366
require.ErrorIs(t, err, rivertype.ErrNotFound)
367367
})
368+
369+
t.Run("ReducedBatchSizeBreakerTrips", func(t *testing.T) {
370+
t.Parallel()
371+
372+
cleaner, _ := setup(t)
373+
374+
ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond)
375+
defer cancel()
376+
377+
// Starts at default batch size.
378+
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
379+
380+
for range cleaner.reducedBatchSizeBreaker.Limit() - 1 {
381+
_, err := cleaner.runOnce(ctx)
382+
require.Error(t, err)
383+
384+
// Circuit not broken yet so we stay at default batch size.
385+
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
386+
}
387+
388+
_, err := cleaner.runOnce(ctx)
389+
require.Error(t, err)
390+
391+
// Circuit now broken. Reduced batch size.
392+
require.Equal(t, riversharedmaintenance.BatchSizeReduced, cleaner.batchSize())
393+
})
394+
395+
t.Run("ReducedBatchSizeBreakerResetsOnSuccess", func(t *testing.T) { //nolint:dupl
396+
t.Parallel()
397+
398+
cleaner, _ := setup(t)
399+
400+
{
401+
ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond)
402+
defer cancel()
403+
404+
// Starts at default batch size.
405+
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
406+
407+
for range cleaner.reducedBatchSizeBreaker.Limit() - 1 {
408+
_, err := cleaner.runOnce(ctx)
409+
require.Error(t, err)
410+
411+
// Circuit not broken yet so we stay at default batch size.
412+
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
413+
}
414+
}
415+
416+
// Context has not been cancelled for this call so it succeeds.
417+
_, err := cleaner.runOnce(ctx)
418+
require.NoError(t, err)
419+
420+
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
421+
422+
// Because of the success above, the circuit breaker resets. N - 1
423+
// failures are allowed again before it breaks.
424+
{
425+
ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond)
426+
defer cancel()
427+
428+
// Starts at default batch size.
429+
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
430+
431+
for range cleaner.reducedBatchSizeBreaker.Limit() - 1 {
432+
_, err := cleaner.runOnce(ctx)
433+
require.Error(t, err)
434+
435+
// Circuit not broken yet so we stay at default batch size.
436+
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
437+
}
438+
}
439+
})
368440
}

internal/maintenance/job_rescuer.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/riverqueue/river/internal/workunit"
1414
"github.com/riverqueue/river/riverdriver"
1515
"github.com/riverqueue/river/rivershared/baseservice"
16+
"github.com/riverqueue/river/rivershared/circuitbreaker"
1617
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
1718
"github.com/riverqueue/river/rivershared/startstop"
1819
"github.com/riverqueue/river/rivershared/testsignal"
@@ -40,6 +41,8 @@ func (ts *JobRescuerTestSignals) Init(tb testutil.TestingTB) {
4041
}
4142

4243
type JobRescuerConfig struct {
44+
riversharedmaintenance.BatchSizes
45+
4346
// ClientRetryPolicy is the default retry policy to use for workers that don't
4447
// override NextRetry.
4548
ClientRetryPolicy jobexecutor.ClientRetryPolicy
@@ -59,6 +62,8 @@ type JobRescuerConfig struct {
5962
}
6063

6164
func (c *JobRescuerConfig) mustValidate() *JobRescuerConfig {
65+
c.MustValidate()
66+
6267
if c.ClientRetryPolicy == nil {
6368
panic("RescuerConfig.ClientRetryPolicy must be set")
6469
}
@@ -85,22 +90,31 @@ type JobRescuer struct {
8590
Config *JobRescuerConfig
8691
TestSignals JobRescuerTestSignals
8792

88-
batchSize int // configurable for test purposes
89-
exec riverdriver.Executor
93+
exec riverdriver.Executor
94+
95+
// Circuit breaker that tracks consecutive timeout failures from the central
96+
// query. The query starts by using the full/default batch size, but after
97+
// this breaker trips (after N consecutive timeouts occur in a row), it
98+
// switches to a smaller batch. We assume that a database that's degraded is
99+
// likely to stay degraded over a longer term, so after the circuit breaks,
100+
// it stays broken until the program is restarted.
101+
reducedBatchSizeBreaker *circuitbreaker.CircuitBreaker
90102
}
91103

92104
func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec riverdriver.Executor) *JobRescuer {
105+
batchSizes := config.WithDefaults()
106+
93107
return baseservice.Init(archetype, &JobRescuer{
94108
Config: (&JobRescuerConfig{
109+
BatchSizes: batchSizes,
95110
ClientRetryPolicy: config.ClientRetryPolicy,
96111
Interval: cmp.Or(config.Interval, JobRescuerIntervalDefault),
97112
RescueAfter: cmp.Or(config.RescueAfter, JobRescuerRescueAfterDefault),
98113
Schema: config.Schema,
99114
WorkUnitFactoryFunc: config.WorkUnitFactoryFunc,
100115
}).mustValidate(),
101-
102-
batchSize: riversharedmaintenance.BatchSizeDefault,
103-
exec: exec,
116+
exec: exec,
117+
reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes),
104118
})
105119
}
106120

@@ -147,6 +161,13 @@ func (s *JobRescuer) Start(ctx context.Context) error {
147161
return nil
148162
}
149163

164+
func (s *JobRescuer) batchSize() int {
165+
if s.reducedBatchSizeBreaker.Open() {
166+
return s.Config.Reduced
167+
}
168+
return s.Config.Default
169+
}
170+
150171
type rescuerRunOnceResult struct {
151172
NumJobsCancelled int64
152173
NumJobsDiscarded int64
@@ -163,9 +184,15 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
163184
for {
164185
stuckJobs, err := s.getStuckJobs(ctx)
165186
if err != nil {
187+
if errors.Is(err, context.DeadlineExceeded) {
188+
s.reducedBatchSizeBreaker.Trip()
189+
}
190+
166191
return nil, fmt.Errorf("error fetching stuck jobs: %w", err)
167192
}
168193

194+
s.reducedBatchSizeBreaker.ResetIfNotOpen()
195+
169196
s.TestSignals.FetchedBatch.Signal(struct{}{})
170197

171198
now := time.Now().UTC()
@@ -236,7 +263,7 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
236263

237264
// Number of rows fetched was less than query `LIMIT` which means work is
238265
// done for this round:
239-
if len(stuckJobs) < s.batchSize {
266+
if len(stuckJobs) < s.batchSize() {
240267
break
241268
}
242269

@@ -253,7 +280,7 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err
253280
stuckHorizon := time.Now().Add(-s.Config.RescueAfter)
254281

255282
return s.exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{
256-
Max: s.batchSize,
283+
Max: s.batchSize(),
257284
Schema: s.Config.Schema,
258285
StuckHorizon: stuckHorizon,
259286
})

0 commit comments

Comments
 (0)