Skip to content

Commit 7be36dd

Browse files
authored
Add job cleaner excluded queues that can be configured through pilot (#1004)
Add a new pilot function that lets it dictate that certain queues should be excluded from the standard cleaning process that manages retention. Along with the `JobDeleteBefore` driver function picking up a new `QueuesExcluded` parameter, we also add its inverse of `QueuesIncluded` which lets us delete jobs from a specific set of queues.
1 parent d278b65 commit 7be36dd

24 files changed

+492
-212
lines changed

client.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/riverqueue/river/riverdriver"
2828
"github.com/riverqueue/river/rivershared/baseservice"
2929
"github.com/riverqueue/river/rivershared/riverpilot"
30+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
3031
"github.com/riverqueue/river/rivershared/startstop"
3132
"github.com/riverqueue/river/rivershared/testsignal"
3233
"github.com/riverqueue/river/rivershared/util/dbutil"
@@ -403,9 +404,9 @@ func (c *Config) WithDefaults() *Config {
403404

404405
return &Config{
405406
AdvisoryLockPrefix: c.AdvisoryLockPrefix,
406-
CancelledJobRetentionPeriod: cmp.Or(c.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault),
407-
CompletedJobRetentionPeriod: cmp.Or(c.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault),
408-
DiscardedJobRetentionPeriod: cmp.Or(c.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault),
407+
CancelledJobRetentionPeriod: cmp.Or(c.CancelledJobRetentionPeriod, riversharedmaintenance.CancelledJobRetentionPeriodDefault),
408+
CompletedJobRetentionPeriod: cmp.Or(c.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault),
409+
DiscardedJobRetentionPeriod: cmp.Or(c.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault),
409410
ErrorHandler: c.ErrorHandler,
410411
FetchCooldown: cmp.Or(c.FetchCooldown, FetchCooldownDefault),
411412
FetchPollInterval: cmp.Or(c.FetchPollInterval, FetchPollIntervalDefault),
@@ -864,6 +865,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
864865
CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod,
865866
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
866867
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
868+
QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(),
867869
Schema: config.Schema,
868870
Timeout: config.JobCleanerTimeout,
869871
}, driver.GetExecutor())

client_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/riverqueue/river/riverdriver"
3535
"github.com/riverqueue/river/riverdriver/riverpgxv5"
3636
"github.com/riverqueue/river/rivershared/baseservice"
37+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
3738
"github.com/riverqueue/river/rivershared/riversharedtest"
3839
"github.com/riverqueue/river/rivershared/startstoptest"
3940
"github.com/riverqueue/river/rivershared/testfactory"
@@ -4610,7 +4611,7 @@ func Test_Client_Maintenance(t *testing.T) {
46104611
client, bundle := setup(t, config)
46114612

46124613
// Normal long retention period.
4613-
deleteHorizon := time.Now().Add(-maintenance.DiscardedJobRetentionPeriodDefault)
4614+
deleteHorizon := time.Now().Add(-riversharedmaintenance.DiscardedJobRetentionPeriodDefault)
46144615

46154616
// Take care to insert jobs before starting the client because otherwise
46164617
// there's a race condition where the cleaner could run its initial
@@ -6726,10 +6727,10 @@ func Test_NewClient_Defaults(t *testing.T) {
67266727
require.Zero(t, client.config.AdvisoryLockPrefix)
67276728

67286729
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
6729-
require.Equal(t, maintenance.CancelledJobRetentionPeriodDefault, jobCleaner.Config.CancelledJobRetentionPeriod)
6730-
require.Equal(t, maintenance.CompletedJobRetentionPeriodDefault, jobCleaner.Config.CompletedJobRetentionPeriod)
6731-
require.Equal(t, maintenance.DiscardedJobRetentionPeriodDefault, jobCleaner.Config.DiscardedJobRetentionPeriod)
6732-
require.Equal(t, maintenance.JobCleanerTimeoutDefault, jobCleaner.Config.Timeout)
6730+
require.Equal(t, riversharedmaintenance.CancelledJobRetentionPeriodDefault, jobCleaner.Config.CancelledJobRetentionPeriod)
6731+
require.Equal(t, riversharedmaintenance.CompletedJobRetentionPeriodDefault, jobCleaner.Config.CompletedJobRetentionPeriod)
6732+
require.Equal(t, riversharedmaintenance.DiscardedJobRetentionPeriodDefault, jobCleaner.Config.DiscardedJobRetentionPeriod)
6733+
require.Equal(t, riversharedmaintenance.JobCleanerTimeoutDefault, jobCleaner.Config.Timeout)
67336734
require.False(t, jobCleaner.StaggerStartupIsDisabled())
67346735

67356736
enqueuer := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)

internal/maintenance/job_cleaner.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/riverqueue/river/riverdriver"
1212
"github.com/riverqueue/river/rivershared/baseservice"
13+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
1314
"github.com/riverqueue/river/rivershared/startstop"
1415
"github.com/riverqueue/river/rivershared/testsignal"
1516
"github.com/riverqueue/river/rivershared/util/randutil"
@@ -18,14 +19,6 @@ import (
1819
"github.com/riverqueue/river/rivershared/util/timeutil"
1920
)
2021

21-
const (
22-
CancelledJobRetentionPeriodDefault = 24 * time.Hour
23-
CompletedJobRetentionPeriodDefault = 24 * time.Hour
24-
DiscardedJobRetentionPeriodDefault = 7 * 24 * time.Hour
25-
JobCleanerIntervalDefault = 30 * time.Second
26-
JobCleanerTimeoutDefault = 30 * time.Second
27-
)
28-
2922
// Test-only properties.
3023
type JobCleanerTestSignals struct {
3124
DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
@@ -38,19 +31,28 @@ func (ts *JobCleanerTestSignals) Init(tb testutil.TestingTB) {
3831
type JobCleanerConfig struct {
3932
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
4033
// around before they're removed permanently.
34+
//
35+
// The special value -1 disables deletion of cancelled jobs.
4136
CancelledJobRetentionPeriod time.Duration
4237

4338
// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
4439
// around before they're removed permanently.
40+
//
41+
// The special value -1 disables deletion of completed jobs.
4542
CompletedJobRetentionPeriod time.Duration
4643

4744
// DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs
4845
// around before they're removed permanently.
46+
//
47+
// The special value -1 disables deletion of discarded jobs.
4948
DiscardedJobRetentionPeriod time.Duration
5049

5150
// Interval is the amount of time to wait between runs of the cleaner.
5251
Interval time.Duration
5352

53+
// QueuesExcluded are queues that'll be excluded from cleaning.
54+
QueuesExcluded []string
55+
5456
// Schema where River tables are located. Empty string omits schema, causing
5557
// Postgres to default to `search_path`.
5658
Schema string
@@ -82,7 +84,7 @@ func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
8284
// JobCleaner periodically removes finalized jobs that are cancelled, completed,
8385
// or discarded. Each state's retention time can be configured individually.
8486
type JobCleaner struct {
85-
queueMaintainerServiceBase
87+
riversharedmaintenance.QueueMaintainerServiceBase
8688
startstop.BaseStartStop
8789

8890
// exported for test purposes
@@ -96,15 +98,16 @@ type JobCleaner struct {
9698
func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner {
9799
return baseservice.Init(archetype, &JobCleaner{
98100
Config: (&JobCleanerConfig{
99-
CancelledJobRetentionPeriod: cmp.Or(config.CancelledJobRetentionPeriod, CancelledJobRetentionPeriodDefault),
100-
CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, CompletedJobRetentionPeriodDefault),
101-
DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, DiscardedJobRetentionPeriodDefault),
102-
Interval: cmp.Or(config.Interval, JobCleanerIntervalDefault),
101+
CancelledJobRetentionPeriod: cmp.Or(config.CancelledJobRetentionPeriod, riversharedmaintenance.CancelledJobRetentionPeriodDefault),
102+
CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault),
103+
DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault),
104+
QueuesExcluded: config.QueuesExcluded,
105+
Interval: cmp.Or(config.Interval, riversharedmaintenance.JobCleanerIntervalDefault),
103106
Schema: config.Schema,
104-
Timeout: cmp.Or(config.Timeout, JobCleanerTimeoutDefault),
107+
Timeout: cmp.Or(config.Timeout, riversharedmaintenance.JobCleanerTimeoutDefault),
105108
}).mustValidate(),
106109

107-
batchSize: BatchSizeDefault,
110+
batchSize: riversharedmaintenance.BatchSizeDefault,
108111
exec: exec,
109112
})
110113
}
@@ -121,8 +124,8 @@ func (s *JobCleaner) Start(ctx context.Context) error { //nolint:dupl
121124
started()
122125
defer stopped() // this defer should come first so it's last out
123126

124-
s.Logger.DebugContext(ctx, s.Name+logPrefixRunLoopStarted)
125-
defer s.Logger.DebugContext(ctx, s.Name+logPrefixRunLoopStopped)
127+
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted)
128+
defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped)
126129

127130
ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
128131
for {
@@ -141,7 +144,7 @@ func (s *JobCleaner) Start(ctx context.Context) error { //nolint:dupl
141144
}
142145

143146
if res.NumJobsDeleted > 0 {
144-
s.Logger.InfoContext(ctx, s.Name+logPrefixRanSuccessfully,
147+
s.Logger.InfoContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
145148
slog.Int("num_jobs_deleted", res.NumJobsDeleted),
146149
)
147150
}
@@ -180,6 +183,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
180183
DiscardedDoDelete: s.Config.DiscardedJobRetentionPeriod != -1,
181184
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
182185
Max: s.batchSize,
186+
QueuesExcluded: s.Config.QueuesExcluded,
183187
Schema: s.Config.Schema,
184188
})
185189
if err != nil {
@@ -204,7 +208,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
204208
slog.Int("num_jobs_deleted", numDeleted),
205209
)
206210

207-
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(BatchBackoffMin, BatchBackoffMax))
211+
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(riversharedmaintenance.BatchBackoffMin, riversharedmaintenance.BatchBackoffMax))
208212
}
209213

210214
return res, nil

internal/maintenance/job_cleaner_test.go

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/riverqueue/river/riverdbtest"
1111
"github.com/riverqueue/river/riverdriver"
1212
"github.com/riverqueue/river/riverdriver/riverpgxv5"
13+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
1314
"github.com/riverqueue/river/rivershared/riversharedtest"
1415
"github.com/riverqueue/river/rivershared/startstoptest"
1516
"github.com/riverqueue/river/rivershared/testfactory"
@@ -34,20 +35,15 @@ func TestJobCleaner(t *testing.T) {
3435

3536
tx := riverdbtest.TestTxPgx(ctx, t)
3637
bundle := &testBundle{
37-
cancelledDeleteHorizon: time.Now().Add(-CancelledJobRetentionPeriodDefault),
38-
completedDeleteHorizon: time.Now().Add(-CompletedJobRetentionPeriodDefault),
38+
cancelledDeleteHorizon: time.Now().Add(-riversharedmaintenance.CancelledJobRetentionPeriodDefault),
39+
completedDeleteHorizon: time.Now().Add(-riversharedmaintenance.CompletedJobRetentionPeriodDefault),
3940
exec: riverpgxv5.New(nil).UnwrapExecutor(tx),
40-
discardedDeleteHorizon: time.Now().Add(-DiscardedJobRetentionPeriodDefault),
41+
discardedDeleteHorizon: time.Now().Add(-riversharedmaintenance.DiscardedJobRetentionPeriodDefault),
4142
}
4243

4344
cleaner := NewJobCleaner(
4445
riversharedtest.BaseServiceArchetype(t),
45-
&JobCleanerConfig{
46-
CancelledJobRetentionPeriod: CancelledJobRetentionPeriodDefault,
47-
CompletedJobRetentionPeriod: CompletedJobRetentionPeriodDefault,
48-
DiscardedJobRetentionPeriod: DiscardedJobRetentionPeriodDefault,
49-
Interval: JobCleanerIntervalDefault,
50-
},
46+
&JobCleanerConfig{},
5147
bundle.exec)
5248
cleaner.StaggerStartupDisable(true)
5349
cleaner.TestSignals.Init(t)
@@ -59,13 +55,13 @@ func TestJobCleaner(t *testing.T) {
5955
t.Run("Defaults", func(t *testing.T) {
6056
t.Parallel()
6157

62-
cleaner := NewJobCleaner(riversharedtest.BaseServiceArchetype(t), &JobCleanerConfig{}, nil)
58+
cleaner, _ := setup(t)
6359

64-
require.Equal(t, CancelledJobRetentionPeriodDefault, cleaner.Config.CancelledJobRetentionPeriod)
65-
require.Equal(t, CompletedJobRetentionPeriodDefault, cleaner.Config.CompletedJobRetentionPeriod)
66-
require.Equal(t, DiscardedJobRetentionPeriodDefault, cleaner.Config.DiscardedJobRetentionPeriod)
67-
require.Equal(t, JobCleanerIntervalDefault, cleaner.Config.Interval)
68-
require.Equal(t, JobCleanerTimeoutDefault, cleaner.Config.Timeout)
60+
require.Equal(t, riversharedmaintenance.CancelledJobRetentionPeriodDefault, cleaner.Config.CancelledJobRetentionPeriod)
61+
require.Equal(t, riversharedmaintenance.CompletedJobRetentionPeriodDefault, cleaner.Config.CompletedJobRetentionPeriod)
62+
require.Equal(t, riversharedmaintenance.DiscardedJobRetentionPeriodDefault, cleaner.Config.DiscardedJobRetentionPeriod)
63+
require.Equal(t, riversharedmaintenance.JobCleanerIntervalDefault, cleaner.Config.Interval)
64+
require.Equal(t, riversharedmaintenance.JobCleanerTimeoutDefault, cleaner.Config.Timeout)
6965
})
7066

7167
t.Run("StartStopStress", func(t *testing.T) {
@@ -78,7 +74,7 @@ func TestJobCleaner(t *testing.T) {
7874
startstoptest.Stress(ctx, t, cleaner)
7975
})
8076

81-
t.Run("DeletesCompletedJobs", func(t *testing.T) {
77+
t.Run("DeletesCancelledCompletedAndDiscardedJobs", func(t *testing.T) {
8278
t.Parallel()
8379

8480
cleaner, bundle := setup(t)
@@ -331,4 +327,42 @@ func TestJobCleaner(t *testing.T) {
331327
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID, Schema: cleaner.Config.Schema})
332328
require.ErrorIs(t, err, rivertype.ErrNotFound)
333329
})
330+
331+
t.Run("OmmittedQueues", func(t *testing.T) {
332+
t.Parallel()
333+
334+
cleaner, bundle := setup(t)
335+
336+
var (
337+
cancelledJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
338+
completedJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
339+
discardedJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})
340+
341+
omittedQueue1 = "omitted1"
342+
omittedQueue2 = "omitted1"
343+
344+
// Not deleted because in an omitted queue.
345+
omittedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &omittedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)})
346+
omittedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &omittedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)})
347+
)
348+
349+
cleaner.Config.QueuesExcluded = []string{omittedQueue1, omittedQueue2}
350+
351+
require.NoError(t, cleaner.Start(ctx))
352+
353+
cleaner.TestSignals.DeletedBatch.WaitOrTimeout()
354+
355+
var err error
356+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: omittedQueueJob1.ID, Schema: cleaner.Config.Schema})
357+
require.NoError(t, err)
358+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: omittedQueueJob2.ID, Schema: cleaner.Config.Schema})
359+
require.NoError(t, err)
360+
361+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
362+
require.ErrorIs(t, err, rivertype.ErrNotFound)
363+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
364+
require.ErrorIs(t, err, rivertype.ErrNotFound)
365+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
366+
require.ErrorIs(t, err, rivertype.ErrNotFound)
367+
})
334368
}

internal/maintenance/job_rescuer.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/riverqueue/river/internal/workunit"
1414
"github.com/riverqueue/river/riverdriver"
1515
"github.com/riverqueue/river/rivershared/baseservice"
16+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
1617
"github.com/riverqueue/river/rivershared/startstop"
1718
"github.com/riverqueue/river/rivershared/testsignal"
1819
"github.com/riverqueue/river/rivershared/util/randutil"
@@ -77,7 +78,7 @@ func (c *JobRescuerConfig) mustValidate() *JobRescuerConfig {
7778
// JobRescuer periodically rescues jobs that have been executing for too long
7879
// and are considered to be "stuck".
7980
type JobRescuer struct {
80-
queueMaintainerServiceBase
81+
riversharedmaintenance.QueueMaintainerServiceBase
8182
startstop.BaseStartStop
8283

8384
// exported for test purposes
@@ -98,7 +99,7 @@ func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec
9899
WorkUnitFactoryFunc: config.WorkUnitFactoryFunc,
99100
}).mustValidate(),
100101

101-
batchSize: BatchSizeDefault,
102+
batchSize: riversharedmaintenance.BatchSizeDefault,
102103
exec: exec,
103104
})
104105
}
@@ -115,8 +116,8 @@ func (s *JobRescuer) Start(ctx context.Context) error {
115116
started()
116117
defer stopped() // this defer should come first so it's last out
117118

118-
s.Logger.DebugContext(ctx, s.Name+logPrefixRunLoopStarted)
119-
defer s.Logger.DebugContext(ctx, s.Name+logPrefixRunLoopStopped)
119+
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted)
120+
defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped)
120121

121122
ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
122123
for {
@@ -135,7 +136,7 @@ func (s *JobRescuer) Start(ctx context.Context) error {
135136
}
136137

137138
if res.NumJobsDiscarded > 0 || res.NumJobsRetried > 0 {
138-
s.Logger.InfoContext(ctx, s.Name+logPrefixRanSuccessfully,
139+
s.Logger.InfoContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
139140
slog.Int64("num_jobs_discarded", res.NumJobsDiscarded),
140141
slog.Int64("num_jobs_retry_scheduled", res.NumJobsRetried),
141142
)
@@ -239,7 +240,7 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
239240
break
240241
}
241242

242-
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(BatchBackoffMin, BatchBackoffMax))
243+
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(riversharedmaintenance.BatchBackoffMin, riversharedmaintenance.BatchBackoffMax))
243244
}
244245

245246
return res, nil

internal/maintenance/job_scheduler.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/riverqueue/river/riverdriver"
1212
"github.com/riverqueue/river/rivershared/baseservice"
13+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
1314
"github.com/riverqueue/river/rivershared/startstop"
1415
"github.com/riverqueue/river/rivershared/testsignal"
1516
"github.com/riverqueue/river/rivershared/util/randutil"
@@ -71,7 +72,7 @@ func (c *JobSchedulerConfig) mustValidate() *JobSchedulerConfig {
7172
// which are ready to run over to `available` so that they're eligible to be
7273
// worked.
7374
type JobScheduler struct {
74-
queueMaintainerServiceBase
75+
riversharedmaintenance.QueueMaintainerServiceBase
7576
startstop.BaseStartStop
7677

7778
// exported for test purposes
@@ -105,8 +106,8 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl
105106
started()
106107
defer stopped() // this defer should come first so it's last out
107108

108-
s.Logger.DebugContext(ctx, s.Name+logPrefixRunLoopStarted)
109-
defer s.Logger.DebugContext(ctx, s.Name+logPrefixRunLoopStopped)
109+
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted)
110+
defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped)
110111

111112
ticker := timeutil.NewTickerWithInitialTick(ctx, s.config.Interval)
112113
for {
@@ -125,7 +126,7 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl
125126
}
126127

127128
if res.NumCompletedJobsScheduled > 0 {
128-
s.Logger.InfoContext(ctx, s.Name+logPrefixRanSuccessfully,
129+
s.Logger.InfoContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
129130
slog.Int("num_jobs_scheduled", res.NumCompletedJobsScheduled),
130131
)
131132
}
@@ -203,7 +204,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
203204
break
204205
}
205206

206-
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(BatchBackoffMin, BatchBackoffMax))
207+
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(riversharedmaintenance.BatchBackoffMin, riversharedmaintenance.BatchBackoffMax))
207208
}
208209

209210
return res, nil

0 commit comments

Comments
 (0)