Skip to content

Commit 517cde8

Browse files
authored
Allow retention periods to be configured to -1 so they're retained indefinitely (#990)
Here, let each of the job cleaner's configured retention periods (`CancelledJobRetentionPeriod`, `CompletedJobRetentionPeriod`, `DiscardedJobRetentionPeriod`) accept the special value of -1 so that the cleaner will maintain that type of job indefinitely, effectively disabling it. This probably won't be of much use day-to-day for most users, but does enable us to augment cleaning functionality with special overrides.
1 parent 2edf911 commit 517cde8

File tree

12 files changed

+198
-21
lines changed

12 files changed

+198
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2727
- **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970).
2828
- Add maximum bound to each job's `attempted_by` array so that in degenerate cases where a job is run many, many times (say it's snoozed hundreds of times), it doesn't grow to unlimited bounds. [PR #974](https://github.com/riverqueue/river/pull/974).
2929
- A logger passed in via `river.Config` now overrides the default test-based logger when using `rivertest.NewWorker`. [PR #980](https://github.com/riverqueue/river/pull/980).
30+
- Cleaner retention periods (`CancelledJobRetentionPeriod`, `CompletedJobRetentionPeriod`, `DiscardedJobRetentionPeriod`) can be configured to -1 to disable them so that the corresponding type of job is retained indefinitely. [PR #990](https://github.com/riverqueue/river/pull/990).
3031

3132
### Fixed
3233

client.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,24 @@ type Config struct {
9797
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
9898
// around before they're removed permanently.
9999
//
100+
// The special value -1 disables deletion of cancelled jobs.
101+
//
100102
// Defaults to 24 hours.
101103
CancelledJobRetentionPeriod time.Duration
102104

103105
// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
104106
// around before they're removed permanently.
105107
//
108+
// The special value -1 disables deletion of completed jobs.
109+
//
106110
// Defaults to 24 hours.
107111
CompletedJobRetentionPeriod time.Duration
108112

109113
// DiscardedJobRetentionPeriod is the amount of time to keep discarded jobs
110114
// around before they're removed permanently.
111115
//
116+
// The special value -1 disables deletion of discarded jobs.
117+
//
112118
// Defaults to 7 days.
113119
DiscardedJobRetentionPeriod time.Duration
114120

@@ -426,14 +432,14 @@ func (c *Config) WithDefaults() *Config {
426432
}
427433

428434
func (c *Config) validate() error {
429-
if c.CancelledJobRetentionPeriod < 0 {
430-
return errors.New("CancelledJobRetentionPeriod time cannot be less than zero")
435+
if c.CancelledJobRetentionPeriod < -1 {
436+
return errors.New("CancelledJobRetentionPeriod time cannot be less than zero, except for -1 (infinite)")
431437
}
432-
if c.CompletedJobRetentionPeriod < 0 {
433-
return errors.New("CompletedJobRetentionPeriod cannot be less than zero")
438+
if c.CompletedJobRetentionPeriod < -1 {
439+
return errors.New("CompletedJobRetentionPeriod cannot be less than zero, except for -1 (infinite)")
434440
}
435-
if c.DiscardedJobRetentionPeriod < 0 {
436-
return errors.New("DiscardedJobRetentionPeriod cannot be less than zero")
441+
if c.DiscardedJobRetentionPeriod < -1 {
442+
return errors.New("DiscardedJobRetentionPeriod cannot be less than zero, except for -1 (infinite)")
437443
}
438444
if c.FetchCooldown < FetchCooldownMin {
439445
return fmt.Errorf("FetchCooldown must be at least %s", FetchCooldownMin)

client_test.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4600,7 +4600,7 @@ func Test_Client_Maintenance(t *testing.T) {
46004600
riversharedtest.WaitOrTimeout(t, client.queueMaintainer.Started())
46014601
}
46024602

4603-
t.Run("JobCleaner", func(t *testing.T) {
4603+
t.Run("JobCleanerCleans", func(t *testing.T) {
46044604
t.Parallel()
46054605

46064606
config := newTestConfig(t, "")
@@ -4656,6 +4656,40 @@ func Test_Client_Maintenance(t *testing.T) {
46564656
require.NotErrorIs(t, err, ErrNotFound) // still there
46574657
})
46584658

4659+
t.Run("JobCleanerDoesNotCleanWithMinusOneRetention", func(t *testing.T) {
4660+
t.Parallel()
4661+
4662+
config := newTestConfig(t, "")
4663+
config.CancelledJobRetentionPeriod = -1
4664+
config.CompletedJobRetentionPeriod = -1
4665+
config.DiscardedJobRetentionPeriod = -1
4666+
4667+
client, bundle := setup(t, config)
4668+
4669+
// Normal long retention period.
4670+
deleteHorizon := time.Now().Add(-maintenance.DiscardedJobRetentionPeriodDefault)
4671+
4672+
// Take care to insert jobs before starting the client because otherwise
4673+
// there's a race condition where the cleaner could run its initial
4674+
// pass before our insertion is complete.
4675+
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(deleteHorizon.Add(-1 * time.Hour))})
4676+
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(deleteHorizon.Add(-1 * time.Hour))})
4677+
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(deleteHorizon.Add(-1 * time.Hour))})
4678+
4679+
startAndWaitForQueueMaintainer(ctx, t, client)
4680+
4681+
jc := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
4682+
jc.TestSignals.DeletedBatch.WaitOrTimeout()
4683+
4684+
var err error
4685+
_, err = client.JobGet(ctx, job1.ID)
4686+
require.NotErrorIs(t, err, ErrNotFound) // still there
4687+
_, err = client.JobGet(ctx, job2.ID)
4688+
require.NotErrorIs(t, err, ErrNotFound) // still there
4689+
_, err = client.JobGet(ctx, job3.ID)
4690+
require.NotErrorIs(t, err, ErrNotFound) // still there
4691+
})
4692+
46594693
t.Run("JobRescuer", func(t *testing.T) {
46604694
t.Parallel()
46614695

internal/maintenance/job_cleaner.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ type JobCleanerConfig struct {
6060
}
6161

6262
func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
63-
if c.CancelledJobRetentionPeriod <= 0 {
63+
if c.CancelledJobRetentionPeriod < -1 {
6464
panic("JobCleanerConfig.CancelledJobRetentionPeriod must be above zero")
6565
}
66-
if c.CompletedJobRetentionPeriod <= 0 {
66+
if c.CompletedJobRetentionPeriod < -1 {
6767
panic("JobCleanerConfig.CompletedJobRetentionPeriod must be above zero")
6868
}
69-
if c.DiscardedJobRetentionPeriod <= 0 {
69+
if c.DiscardedJobRetentionPeriod < -1 {
7070
panic("JobCleanerConfig.DiscardedJobRetentionPeriod must be above zero")
7171
}
7272
if c.Interval <= 0 {
@@ -161,12 +161,23 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
161161
for {
162162
// Wrapped in a function so that defers run as expected.
163163
numDeleted, err := func() (int, error) {
164+
// In the special case that all retentions are indefinite, don't
165+
// bother issuing the query at all as an optimization.
166+
if s.Config.CompletedJobRetentionPeriod == -1 &&
167+
s.Config.CancelledJobRetentionPeriod == -1 &&
168+
s.Config.DiscardedJobRetentionPeriod == -1 {
169+
return 0, nil
170+
}
171+
164172
ctx, cancelFunc := context.WithTimeout(ctx, s.Config.Timeout)
165173
defer cancelFunc()
166174

167175
numDeleted, err := s.exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
176+
CancelledDoDelete: s.Config.CancelledJobRetentionPeriod != -1,
168177
CancelledFinalizedAtHorizon: time.Now().Add(-s.Config.CancelledJobRetentionPeriod),
178+
CompletedDoDelete: s.Config.CompletedJobRetentionPeriod != -1,
169179
CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod),
180+
DiscardedDoDelete: s.Config.DiscardedJobRetentionPeriod != -1,
170181
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
171182
Max: s.batchSize,
172183
Schema: s.Config.Schema,

internal/maintenance/job_cleaner_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,104 @@ func TestJobCleaner(t *testing.T) {
134134
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
135135
})
136136

137+
t.Run("DoesNotDeleteWhenRetentionMinusOne", func(t *testing.T) {
138+
t.Parallel()
139+
140+
cleaner, bundle := setup(t)
141+
cleaner.Config.CancelledJobRetentionPeriod = -1
142+
cleaner.Config.CompletedJobRetentionPeriod = -1
143+
cleaner.Config.DiscardedJobRetentionPeriod = -1
144+
145+
cancelledJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
146+
completedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
147+
discardedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})
148+
149+
require.NoError(t, cleaner.Start(ctx))
150+
151+
cleaner.TestSignals.DeletedBatch.WaitOrTimeout()
152+
153+
var err error
154+
155+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
156+
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
157+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
158+
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
159+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
160+
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
161+
})
162+
163+
t.Run("DoesNotDeleteCancelledWhenRetentionMinusOne", func(t *testing.T) { //nolint:dupl
164+
t.Parallel()
165+
166+
cleaner, bundle := setup(t)
167+
cleaner.Config.CancelledJobRetentionPeriod = -1
168+
169+
cancelledJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
170+
completedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
171+
discardedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})
172+
173+
require.NoError(t, cleaner.Start(ctx))
174+
175+
cleaner.TestSignals.DeletedBatch.WaitOrTimeout()
176+
177+
var err error
178+
179+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
180+
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
181+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
182+
require.ErrorIs(t, err, rivertype.ErrNotFound)
183+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
184+
require.ErrorIs(t, err, rivertype.ErrNotFound)
185+
})
186+
187+
t.Run("DoesNotDeleteCompletedWhenRetentionMinusOne", func(t *testing.T) { //nolint:dupl
188+
t.Parallel()
189+
190+
cleaner, bundle := setup(t)
191+
cleaner.Config.CompletedJobRetentionPeriod = -1
192+
193+
cancelledJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
194+
completedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
195+
discardedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})
196+
197+
require.NoError(t, cleaner.Start(ctx))
198+
199+
cleaner.TestSignals.DeletedBatch.WaitOrTimeout()
200+
201+
var err error
202+
203+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
204+
require.ErrorIs(t, err, rivertype.ErrNotFound)
205+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
206+
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
207+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
208+
require.ErrorIs(t, err, rivertype.ErrNotFound)
209+
})
210+
211+
t.Run("DoesNotDeleteDiscardedWhenRetentionMinusOne", func(t *testing.T) { //nolint:dupl
212+
t.Parallel()
213+
214+
cleaner, bundle := setup(t)
215+
cleaner.Config.DiscardedJobRetentionPeriod = -1
216+
217+
cancelledJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
218+
completedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
219+
discardedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})
220+
221+
require.NoError(t, cleaner.Start(ctx))
222+
223+
cleaner.TestSignals.DeletedBatch.WaitOrTimeout()
224+
225+
var err error
226+
227+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
228+
require.ErrorIs(t, err, rivertype.ErrNotFound)
229+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
230+
require.ErrorIs(t, err, rivertype.ErrNotFound)
231+
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
232+
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
233+
})
234+
137235
t.Run("DeletesInBatches", func(t *testing.T) {
138236
t.Parallel()
139237

riverdriver/river_driver_interface.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,11 @@ type JobDeleteParams struct {
374374
}
375375

376376
type JobDeleteBeforeParams struct {
377+
CancelledDoDelete bool
377378
CancelledFinalizedAtHorizon time.Time
379+
CompletedDoDelete bool
378380
CompletedFinalizedAtHorizon time.Time
381+
DiscardedDoDelete bool
379382
DiscardedFinalizedAtHorizon time.Time
380383
Max int
381384
Schema string

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 10 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,11 @@ func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteP
258258

259259
func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) {
260260
res, err := dbsqlc.New().JobDeleteBefore(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobDeleteBeforeParams{
261+
CancelledDoDelete: params.CancelledDoDelete,
261262
CancelledFinalizedAtHorizon: params.CancelledFinalizedAtHorizon,
263+
CompletedDoDelete: params.CompletedDoDelete,
262264
CompletedFinalizedAtHorizon: params.CompletedFinalizedAtHorizon,
265+
DiscardedDoDelete: params.DiscardedDoDelete,
263266
DiscardedFinalizedAtHorizon: params.DiscardedFinalizedAtHorizon,
264267
Max: int64(params.Max),
265268
})

riverdriver/riverdrivertest/riverdrivertest.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,8 +946,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
946946

947947
// Max two deleted on the first pass.
948948
numDeleted, err := exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
949+
CancelledDoDelete: true,
949950
CancelledFinalizedAtHorizon: horizon,
951+
CompletedDoDelete: true,
950952
CompletedFinalizedAtHorizon: horizon,
953+
DiscardedDoDelete: true,
951954
DiscardedFinalizedAtHorizon: horizon,
952955
Max: 2,
953956
})
@@ -956,8 +959,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
956959

957960
// And one more pass gets the last one.
958961
numDeleted, err = exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
962+
CancelledDoDelete: true,
959963
CancelledFinalizedAtHorizon: horizon,
964+
CompletedDoDelete: true,
960965
CompletedFinalizedAtHorizon: horizon,
966+
DiscardedDoDelete: true,
961967
DiscardedFinalizedAtHorizon: horizon,
962968
Max: 2,
963969
})

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,9 @@ WHERE id IN (
158158
SELECT id
159159
FROM /* TEMPLATE: schema */river_job
160160
WHERE
161-
(state = 'cancelled' AND finalized_at < @cancelled_finalized_at_horizon::timestamptz) OR
162-
(state = 'completed' AND finalized_at < @completed_finalized_at_horizon::timestamptz) OR
163-
(state = 'discarded' AND finalized_at < @discarded_finalized_at_horizon::timestamptz)
161+
(state = 'cancelled' AND @cancelled_do_delete AND finalized_at < @cancelled_finalized_at_horizon::timestamptz) OR
162+
(state = 'completed' AND @completed_do_delete AND finalized_at < @completed_finalized_at_horizon::timestamptz) OR
163+
(state = 'discarded' AND @discarded_do_delete AND finalized_at < @discarded_finalized_at_horizon::timestamptz)
164164
ORDER BY id
165165
LIMIT @max::bigint
166166
);

0 commit comments

Comments
 (0)