Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Basic stuck detection after a job's exceeded its timeout and still not returned after the executor's initiated context cancellation and waited a short margin for the cancellation to take effect. [PR #1097](https://github.com/riverqueue/river/pull/1097).

## [0.29.0-rc.1] - 2025-12-04

- Added `HookPeriodicJobsStart` that can be used to run custom logic when a periodic job enqueuer starts up on a new leader. [PR #1084](https://github.com/riverqueue/river/pull/1084).
Expand Down
66 changes: 61 additions & 5 deletions internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,17 @@ type JobExecutor struct {
ErrorHandler ErrorHandler
HookLookupByJob *hooklookup.JobHookLookup
HookLookupGlobal hooklookup.HookLookupInterface
InformProducerDoneFunc func(jobRow *rivertype.JobRow)
JobRow *rivertype.JobRow
MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface
SchedulerInterval time.Duration
WorkerMiddleware []rivertype.WorkerMiddleware
WorkUnit workunit.WorkUnit
ProducerCallbacks struct {
JobDone func(jobRow *rivertype.JobRow)
Stuck func()
Unstuck func()
}
SchedulerInterval time.Duration
StuckThresholdOverride time.Duration
WorkerMiddleware []rivertype.WorkerMiddleware
WorkUnit workunit.WorkUnit

// Meant to be used from within the job executor only.
start time.Time
Expand Down Expand Up @@ -159,7 +164,7 @@ func (e *JobExecutor) Execute(ctx context.Context) {
}
}

e.InformProducerDoneFunc(e.JobRow)
e.ProducerCallbacks.JobDone(e.JobRow)
}

// Executes the job, handling a panic if necessary (and various other error
Expand All @@ -171,6 +176,57 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
metadataUpdates := make(map[string]any)
ctx = context.WithValue(ctx, ContextKeyMetadataUpdates, metadataUpdates)

// Watches for jobs that may have become stuck. i.e. They've run longer than
// their job timeout (plus a small margin) and don't appear to be responding
// to context cancellation (unfortunately, quite an easy error to make in
// Go).
//
// Currently we don't do anything if we notice a job is stuck. Knowing about
// stuck jobs is just used for informational purposes in the producer in
// generating periodic stats.
if e.ClientJobTimeout > 0 {
// We add a WithoutCancel here so that this inner goroutine becomes
// immune to all context cancellations _except_ the one where it's
// cancelled because we leave JobExecutor.execute.
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
defer cancel()

go func() {
const stuckThresholdDefault = 5 * time.Second

select {
case <-ctx.Done():
// context cancelled as we leave JobExecutor.execute

case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)):
e.ProducerCallbacks.Stuck()

e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",
slog.Int64("job_id", e.JobRow.ID),
slog.String("kind", e.JobRow.Kind),
slog.Duration("timeout", e.ClientJobTimeout),
)

// context cancelled as we leave JobExecutor.execute
<-ctx.Done()

// In case the executor ever becomes unstuck, inform the
// producer. However, if we got all the way here there's a good
// chance this will never happen (the worker is really stuck and
// will never return).
defer e.ProducerCallbacks.Unstuck()
Comment on lines +213 to +217
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this just run immediately after the above warning log? It's deferring within the inner go func() closure which merely exits after both these defers are added to the stack, and there's nothing to block on the jobs actually becoming unstuck. Or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there was another <- ctx.Done() that was missing here. I've added that in, and also improved the test case so that it'll fail in the event that wait is missing.

I also put in some more logging in the test case so you can run it and verify manually (in case you want/need to) that it's working as expected. e.g.:

$ go test ./internal/jobexecutor -run TestJobExecutor_Execute/StuckDetectionActivates -test.v
=== RUN   TestJobExecutor_Execute
=== PAUSE TestJobExecutor_Execute
=== CONT  TestJobExecutor_Execute
=== RUN   TestJobExecutor_Execute/StuckDetectionActivates
=== PAUSE TestJobExecutor_Execute/StuckDetectionActivates
=== CONT  TestJobExecutor_Execute/StuckDetectionActivates
    riverdbtest.go:216: Dropped 1 expired postgres schema(s) in 14.537458ms
    riverdbtest.go:293: TestSchemaOpts.disableReuse is set; schema not checked in for reuse
    job_executor_test.go:715: Generated postgres schema "jobexecutor_2025_12_08t09_03_56_schema_01" with migrations [1 2 3 4 5 6] on line "main" in 63.787208ms [1 generated] [0 reused]
    job_executor_test.go:715: TestTx using postgres schema: jobexecutor_2025_12_08t09_03_56_schema_01
    job_executor_test.go:724: Job executor reported stuck
    logger.go:256: time=2025-12-08T09:03:56.218-05:00 level=WARN msg="jobexecutor.JobExecutor: Job appears to be stuck" job_id=1 kind=jobexecutor_test timeout=5ms
    job_executor_test.go:739: Job executor still stuck after wait (this is expected)
    logger.go:256: time=2025-12-08T09:03:56.229-05:00 level=INFO msg="jobexecutor.JobExecutor: Job became unstuck" duration=17.011ms job_id=1 kind=jobexecutor_test
    job_executor_test.go:728: Job executor reported unstuck (after being stuck)
--- PASS: TestJobExecutor_Execute (0.00s)
    --- PASS: TestJobExecutor_Execute/StuckDetectionActivates (0.12s)
PASS
ok      github.com/riverqueue/river/internal/jobexecutor        0.299s


defer func() {
e.Logger.InfoContext(ctx, e.Name+": Job became unstuck",
slog.Duration("duration", time.Since(e.start)),
slog.Int64("job_id", e.JobRow.ID),
slog.String("kind", e.JobRow.Kind),
)
}()
}
}()
}

defer func() {
if recovery := recover(); recovery != nil {
e.Logger.ErrorContext(ctx, e.Name+": panic recovery; possible bug with Worker",
Expand Down
102 changes: 99 additions & 3 deletions internal/jobexecutor/job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,19 @@ func TestJobExecutor_Execute(t *testing.T) {
ErrorHandler: bundle.errorHandler,
HookLookupByJob: hooklookup.NewJobHookLookup(),
HookLookupGlobal: hooklookup.NewHookLookup(nil),
InformProducerDoneFunc: func(job *rivertype.JobRow) {},
JobRow: bundle.jobRow,
MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil),
SchedulerInterval: riverinternaltest.SchedulerShortInterval,
WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow),
ProducerCallbacks: struct {
JobDone func(jobRow *rivertype.JobRow)
Stuck func()
Unstuck func()
}{
JobDone: func(jobRow *rivertype.JobRow) {},
Stuck: func() {},
Unstuck: func() {},
},
SchedulerInterval: riverinternaltest.SchedulerShortInterval,
WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow),
})

return executor, bundle
Expand Down Expand Up @@ -696,6 +704,94 @@ func TestJobExecutor_Execute(t *testing.T) {
})
})

configureStuckDetection := func(executor *JobExecutor) {
executor.ClientJobTimeout = 5 * time.Millisecond
executor.StuckThresholdOverride = 1 * time.Nanosecond // must be greater than 0 to take effect
}

t.Run("StuckDetectionActivates", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)

configureStuckDetection(executor)

var (
informProducerStuckReceived = make(chan struct{})
informProducerUnstuckReceived = make(chan struct{})
)
executor.ProducerCallbacks.Stuck = func() {
t.Log("Job executor reported stuck")
close(informProducerStuckReceived)
}
executor.ProducerCallbacks.Unstuck = func() {
t.Log("Job executor reported unstuck (after being stuck)")
close(informProducerUnstuckReceived)
}

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
riversharedtest.WaitOrTimeout(t, informProducerStuckReceived)

select {
case <-informProducerUnstuckReceived:
require.FailNow(t, "Executor should not have reported unstuck immediately")
case <-time.After(10 * time.Millisecond):
t.Log("Job executor still stuck after wait (this is expected)")
}

return nil
}, nil).MakeUnit(bundle.jobRow)

executor.Execute(ctx)
_ = riversharedtest.WaitOrTimeout(t, bundle.updateCh)

riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived)
})

// Checks that even if a work's context is cancelled immediately, stuck
// detection still works as expected.
t.Run("StuckDetectionIgnoresParentContextCancellation", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)

configureStuckDetection(executor)

var (
informProducerStuckReceived = make(chan struct{})
informProducerUnstuckReceived = make(chan struct{})
)
executor.ProducerCallbacks.Stuck = func() {
t.Log("Job executor reported stuck")
close(informProducerStuckReceived)
}
executor.ProducerCallbacks.Unstuck = func() {
t.Log("Job executor reported unstuck (after being stuck)")
close(informProducerUnstuckReceived)
}

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
riversharedtest.WaitOrTimeout(t, informProducerStuckReceived)

select {
case <-informProducerUnstuckReceived:
require.FailNow(t, "Executor should not have reported unstuck immediately")
case <-time.After(10 * time.Millisecond):
t.Log("Job executor still stuck after wait (this is expected)")
}

return nil
}, nil).MakeUnit(bundle.jobRow)

ctx, cancel := context.WithCancel(ctx)
cancel() // cancel immediately

executor.Execute(ctx)
_ = riversharedtest.WaitOrTimeout(t, bundle.updateCh)

riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived)
})

t.Run("Panic", func(t *testing.T) {
t.Parallel()

Expand Down
25 changes: 20 additions & 5 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type producer struct {
// An atomic count of the number of jobs actively being worked on. This is
// written to by the main goroutine, but read by the dispatcher.
numJobsActive atomic.Int32
numJobsStuck atomic.Int32

numJobsRan atomic.Uint64
paused bool
Expand Down Expand Up @@ -771,20 +772,26 @@ func (p *producer) heartbeatLogLoop(ctx context.Context, wg *sync.WaitGroup) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
type jobCount struct {
ran uint64
active int
ran uint64
stuck int
}
var prevCount jobCount
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
curCount := jobCount{ran: p.numJobsRan.Load(), active: int(p.numJobsActive.Load())}
curCount := jobCount{
active: int(p.numJobsActive.Load()),
ran: p.numJobsRan.Load(),
stuck: int(p.numJobsStuck.Load()),
}
if curCount != prevCount {
p.Logger.InfoContext(ctx, p.Name+": Producer job counts",
slog.Uint64("num_completed_jobs", curCount.ran),
slog.Int("num_jobs_running", curCount.active),
slog.Int("num_jobs_stuck", curCount.stuck),
slog.String("queue", p.config.Queue),
)
}
Expand Down Expand Up @@ -815,10 +822,18 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype.
HookLookupByJob: p.config.HookLookupByJob,
HookLookupGlobal: p.config.HookLookupGlobal,
MiddlewareLookupGlobal: p.config.MiddlewareLookupGlobal,
InformProducerDoneFunc: p.handleWorkerDone,
JobRow: job,
SchedulerInterval: p.config.SchedulerInterval,
WorkUnit: workUnit,
ProducerCallbacks: struct {
JobDone func(jobRow *rivertype.JobRow)
Stuck func()
Unstuck func()
}{
JobDone: p.handleWorkerDone,
Stuck: func() { p.numJobsStuck.Add(1) },
Unstuck: func() { p.numJobsStuck.Add(-1) },
},
SchedulerInterval: p.config.SchedulerInterval,
WorkUnit: workUnit,
})
p.addActiveJob(job.ID, executor)

Expand Down
14 changes: 11 additions & 3 deletions rivertest/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,21 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job
return nil
},
},
InformProducerDoneFunc: func(job *rivertype.JobRow) { close(executionDone) },
HookLookupGlobal: hooklookup.NewHookLookup(w.config.Hooks),
HookLookupByJob: hooklookup.NewJobHookLookup(),
JobRow: job,
MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(w.config.Middleware),
SchedulerInterval: maintenance.JobSchedulerIntervalDefault,
WorkUnit: workUnit,
ProducerCallbacks: struct {
JobDone func(jobRow *rivertype.JobRow)
Stuck func()
Unstuck func()
}{
JobDone: func(job *rivertype.JobRow) { close(executionDone) },
Stuck: func() {},
Unstuck: func() {},
},
SchedulerInterval: maintenance.JobSchedulerIntervalDefault,
WorkUnit: workUnit,
})

executor.Execute(jobCtx)
Expand Down