Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970).
- Add maximum bound to each job's `attempted_by` array so that in degenerate cases where a job is run many, many times (say it's snoozed hundreds of times), it doesn't grow to unlimited bounds. [PR #974](https://github.com/riverqueue/river/pull/974).
- A logger passed in via `river.Config` now overrides the default test-based logger when using `rivertest.NewWorker`. [PR #980](https://github.com/riverqueue/river/pull/980).
- Cleaner retention periods (`CancelledJobRetentionPeriod`, `CompletedJobRetentionPeriod`, `DiscardedJobRetentionPeriod`) can be configured to -1 to disable them so that the corresponding type of job is retained indefinitely. [PR #990](https://github.com/riverqueue/river/pull/990).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably warrants a doc addition to https://riverqueue.com/docs/maintenance-services#cleaner

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a note of it for when we do a release.


### Fixed

Expand Down
18 changes: 12 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,24 @@ type Config struct {
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of cancelled jobs.
//
// Defaults to 24 hours.
CancelledJobRetentionPeriod time.Duration

// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of completed jobs.
//
// Defaults to 24 hours.
CompletedJobRetentionPeriod time.Duration

// DiscardedJobRetentionPeriod is the amount of time to keep discarded jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of discarded jobs.
//
// Defaults to 7 days.
DiscardedJobRetentionPeriod time.Duration

Expand Down Expand Up @@ -426,14 +432,14 @@ func (c *Config) WithDefaults() *Config {
}

func (c *Config) validate() error {
if c.CancelledJobRetentionPeriod < 0 {
return errors.New("CancelledJobRetentionPeriod time cannot be less than zero")
if c.CancelledJobRetentionPeriod < -1 {
return errors.New("CancelledJobRetentionPeriod time cannot be less than zero, except for -1 (infinite)")
}
if c.CompletedJobRetentionPeriod < 0 {
return errors.New("CompletedJobRetentionPeriod cannot be less than zero")
if c.CompletedJobRetentionPeriod < -1 {
return errors.New("CompletedJobRetentionPeriod cannot be less than zero, except for -1 (infinite)")
}
if c.DiscardedJobRetentionPeriod < 0 {
return errors.New("DiscardedJobRetentionPeriod cannot be less than zero")
if c.DiscardedJobRetentionPeriod < -1 {
return errors.New("DiscardedJobRetentionPeriod cannot be less than zero, except for -1 (infinite)")
}
if c.FetchCooldown < FetchCooldownMin {
return fmt.Errorf("FetchCooldown must be at least %s", FetchCooldownMin)
Expand Down
36 changes: 35 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4600,7 +4600,7 @@ func Test_Client_Maintenance(t *testing.T) {
riversharedtest.WaitOrTimeout(t, client.queueMaintainer.Started())
}

t.Run("JobCleaner", func(t *testing.T) {
t.Run("JobCleanerCleans", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
Expand Down Expand Up @@ -4656,6 +4656,40 @@ func Test_Client_Maintenance(t *testing.T) {
require.NotErrorIs(t, err, ErrNotFound) // still there
})

t.Run("JobCleanerDoesNotCleanWithMinusOneRetention", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.CancelledJobRetentionPeriod = -1
config.CompletedJobRetentionPeriod = -1
config.DiscardedJobRetentionPeriod = -1

client, bundle := setup(t, config)

// Normal long retention period.
deleteHorizon := time.Now().Add(-maintenance.DiscardedJobRetentionPeriodDefault)

// Take care to insert jobs before starting the client because otherwise
// there's a race condition where the cleaner could run its initial
// pass before our insertion is complete.
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(deleteHorizon.Add(-1 * time.Hour))})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(deleteHorizon.Add(-1 * time.Hour))})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(deleteHorizon.Add(-1 * time.Hour))})

startAndWaitForQueueMaintainer(ctx, t, client)

jc := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
jc.TestSignals.DeletedBatch.WaitOrTimeout()

var err error
_, err = client.JobGet(ctx, job1.ID)
require.NotErrorIs(t, err, ErrNotFound) // still there
_, err = client.JobGet(ctx, job2.ID)
require.NotErrorIs(t, err, ErrNotFound) // still there
_, err = client.JobGet(ctx, job3.ID)
require.NotErrorIs(t, err, ErrNotFound) // still there
})

t.Run("JobRescuer", func(t *testing.T) {
t.Parallel()

Expand Down
17 changes: 14 additions & 3 deletions internal/maintenance/job_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ type JobCleanerConfig struct {
}

func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
if c.CancelledJobRetentionPeriod <= 0 {
if c.CancelledJobRetentionPeriod < -1 {
panic("JobCleanerConfig.CancelledJobRetentionPeriod must be above zero")
}
if c.CompletedJobRetentionPeriod <= 0 {
if c.CompletedJobRetentionPeriod < -1 {
panic("JobCleanerConfig.CompletedJobRetentionPeriod must be above zero")
}
if c.DiscardedJobRetentionPeriod <= 0 {
if c.DiscardedJobRetentionPeriod < -1 {
panic("JobCleanerConfig.DiscardedJobRetentionPeriod must be above zero")
}
if c.Interval <= 0 {
Expand Down Expand Up @@ -161,12 +161,23 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
for {
// Wrapped in a function so that defers run as expected.
numDeleted, err := func() (int, error) {
// In the special case that all retentions are indefinite, don't
// bother issuing the query at all as an optimization.
if s.Config.CompletedJobRetentionPeriod == -1 &&
s.Config.CancelledJobRetentionPeriod == -1 &&
s.Config.DiscardedJobRetentionPeriod == -1 {
return 0, nil
}

ctx, cancelFunc := context.WithTimeout(ctx, s.Config.Timeout)
defer cancelFunc()

numDeleted, err := s.exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
CancelledDoDelete: s.Config.CancelledJobRetentionPeriod != -1,
CancelledFinalizedAtHorizon: time.Now().Add(-s.Config.CancelledJobRetentionPeriod),
CompletedDoDelete: s.Config.CompletedJobRetentionPeriod != -1,
CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod),
DiscardedDoDelete: s.Config.DiscardedJobRetentionPeriod != -1,
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
Max: s.batchSize,
Schema: s.Config.Schema,
Expand Down
98 changes: 98 additions & 0 deletions internal/maintenance/job_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,104 @@ func TestJobCleaner(t *testing.T) {
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
})

t.Run("DoesNotDeleteWhenRetentionMinusOne", func(t *testing.T) {
t.Parallel()

cleaner, bundle := setup(t)
cleaner.Config.CancelledJobRetentionPeriod = -1
cleaner.Config.CompletedJobRetentionPeriod = -1
cleaner.Config.DiscardedJobRetentionPeriod = -1

cancelledJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
completedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
discardedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})

require.NoError(t, cleaner.Start(ctx))

cleaner.TestSignals.DeletedBatch.WaitOrTimeout()

var err error

_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
})

t.Run("DoesNotDeleteCancelledWhenRetentionMinusOne", func(t *testing.T) { //nolint:dupl
t.Parallel()

cleaner, bundle := setup(t)
cleaner.Config.CancelledJobRetentionPeriod = -1

cancelledJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
completedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
discardedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})

require.NoError(t, cleaner.Start(ctx))

cleaner.TestSignals.DeletedBatch.WaitOrTimeout()

var err error

_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
require.ErrorIs(t, err, rivertype.ErrNotFound)
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

t.Run("DoesNotDeleteCompletedWhenRetentionMinusOne", func(t *testing.T) { //nolint:dupl
t.Parallel()

cleaner, bundle := setup(t)
cleaner.Config.CompletedJobRetentionPeriod = -1

cancelledJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
completedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
discardedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})

require.NoError(t, cleaner.Start(ctx))

cleaner.TestSignals.DeletedBatch.WaitOrTimeout()

var err error

_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
require.ErrorIs(t, err, rivertype.ErrNotFound)
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

t.Run("DoesNotDeleteDiscardedWhenRetentionMinusOne", func(t *testing.T) { //nolint:dupl
t.Parallel()

cleaner, bundle := setup(t)
cleaner.Config.DiscardedJobRetentionPeriod = -1

cancelledJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
completedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
discardedJob := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})

require.NoError(t, cleaner.Start(ctx))

cleaner.TestSignals.DeletedBatch.WaitOrTimeout()

var err error

_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema})
require.ErrorIs(t, err, rivertype.ErrNotFound)
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: completedJob.ID, Schema: cleaner.Config.Schema})
require.ErrorIs(t, err, rivertype.ErrNotFound)
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
require.NotErrorIs(t, err, rivertype.ErrNotFound) // still there
})

t.Run("DeletesInBatches", func(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 3 additions & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,11 @@ type JobDeleteParams struct {
}

type JobDeleteBeforeParams struct {
CancelledDoDelete bool
CancelledFinalizedAtHorizon time.Time
CompletedDoDelete bool
CompletedFinalizedAtHorizon time.Time
DiscardedDoDelete bool
DiscardedFinalizedAtHorizon time.Time
Max int
Schema string
Expand Down
14 changes: 10 additions & 4 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,11 @@ func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteP

func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) {
res, err := dbsqlc.New().JobDeleteBefore(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobDeleteBeforeParams{
CancelledDoDelete: params.CancelledDoDelete,
CancelledFinalizedAtHorizon: params.CancelledFinalizedAtHorizon,
CompletedDoDelete: params.CompletedDoDelete,
CompletedFinalizedAtHorizon: params.CompletedFinalizedAtHorizon,
DiscardedDoDelete: params.DiscardedDoDelete,
DiscardedFinalizedAtHorizon: params.DiscardedFinalizedAtHorizon,
Max: int64(params.Max),
})
Expand Down
6 changes: 6 additions & 0 deletions riverdriver/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

// Max two deleted on the first pass.
numDeleted, err := exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
CancelledDoDelete: true,
CancelledFinalizedAtHorizon: horizon,
CompletedDoDelete: true,
CompletedFinalizedAtHorizon: horizon,
DiscardedDoDelete: true,
DiscardedFinalizedAtHorizon: horizon,
Max: 2,
})
Expand All @@ -956,8 +959,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

// And one more pass gets the last one.
numDeleted, err = exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
CancelledDoDelete: true,
CancelledFinalizedAtHorizon: horizon,
CompletedDoDelete: true,
CompletedFinalizedAtHorizon: horizon,
DiscardedDoDelete: true,
DiscardedFinalizedAtHorizon: horizon,
Max: 2,
})
Expand Down
6 changes: 3 additions & 3 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ WHERE id IN (
SELECT id
FROM /* TEMPLATE: schema */river_job
WHERE
(state = 'cancelled' AND finalized_at < @cancelled_finalized_at_horizon::timestamptz) OR
(state = 'completed' AND finalized_at < @completed_finalized_at_horizon::timestamptz) OR
(state = 'discarded' AND finalized_at < @discarded_finalized_at_horizon::timestamptz)
(state = 'cancelled' AND @cancelled_do_delete AND finalized_at < @cancelled_finalized_at_horizon::timestamptz) OR
(state = 'completed' AND @completed_do_delete AND finalized_at < @completed_finalized_at_horizon::timestamptz) OR
(state = 'discarded' AND @discarded_do_delete AND finalized_at < @discarded_finalized_at_horizon::timestamptz)
ORDER BY id
LIMIT @max::bigint
);
Expand Down
14 changes: 10 additions & 4 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,11 @@ func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteP

func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) {
res, err := dbsqlc.New().JobDeleteBefore(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobDeleteBeforeParams{
CancelledDoDelete: params.CancelledDoDelete,
CancelledFinalizedAtHorizon: params.CancelledFinalizedAtHorizon,
CompletedDoDelete: params.CompletedDoDelete,
CompletedFinalizedAtHorizon: params.CompletedFinalizedAtHorizon,
DiscardedDoDelete: params.DiscardedDoDelete,
DiscardedFinalizedAtHorizon: params.DiscardedFinalizedAtHorizon,
Max: int64(params.Max),
})
Expand Down
Loading