-
Notifications
You must be signed in to change notification settings - Fork 129
Basic stuck job detection #1097
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
74f2aeb to
03bc984
Compare
|
@bgentry Thoughts on this basic shape? I figure I'd follow up with another change that actually does something about the stuck jobs. |
03bc984 to
c85f4df
Compare
| // In case the executor ever becomes unstuck, inform the | ||
| // producer. However, if we got all the way here there's a good | ||
| // chance this will never happen (the worker is really stuck and | ||
| // will never return). | ||
| defer e.ProducerCallbacks.Unstuck() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this just run immediately after the above warning log? It's deferring within the inner go func() closure which merely exits after both these defers are added to the stack, and there's nothing to block on the jobs actually becoming unstuck. Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there was another <- ctx.Done() that was missing here. I've added that in, and also improved the test case so that it'll fail in the event that wait is missing.
I also put in some more logging in the test case so you can run it and verify manually (in case you want/need to) that it's working as expected. e.g.:
$ go test ./internal/jobexecutor -run TestJobExecutor_Execute/StuckDetectionActivates -test.v
=== RUN TestJobExecutor_Execute
=== PAUSE TestJobExecutor_Execute
=== CONT TestJobExecutor_Execute
=== RUN TestJobExecutor_Execute/StuckDetectionActivates
=== PAUSE TestJobExecutor_Execute/StuckDetectionActivates
=== CONT TestJobExecutor_Execute/StuckDetectionActivates
riverdbtest.go:216: Dropped 1 expired postgres schema(s) in 14.537458ms
riverdbtest.go:293: TestSchemaOpts.disableReuse is set; schema not checked in for reuse
job_executor_test.go:715: Generated postgres schema "jobexecutor_2025_12_08t09_03_56_schema_01" with migrations [1 2 3 4 5 6] on line "main" in 63.787208ms [1 generated] [0 reused]
job_executor_test.go:715: TestTx using postgres schema: jobexecutor_2025_12_08t09_03_56_schema_01
job_executor_test.go:724: Job executor reported stuck
logger.go:256: time=2025-12-08T09:03:56.218-05:00 level=WARN msg="jobexecutor.JobExecutor: Job appears to be stuck" job_id=1 kind=jobexecutor_test timeout=5ms
job_executor_test.go:739: Job executor still stuck after wait (this is expected)
logger.go:256: time=2025-12-08T09:03:56.229-05:00 level=INFO msg="jobexecutor.JobExecutor: Job became unstuck" duration=17.011ms job_id=1 kind=jobexecutor_test
job_executor_test.go:728: Job executor reported unstuck (after being stuck)
--- PASS: TestJobExecutor_Execute (0.00s)
--- PASS: TestJobExecutor_Execute/StuckDetectionActivates (0.12s)
PASS
ok github.com/riverqueue/river/internal/jobexecutor 0.299s
internal/jobexecutor/job_executor.go
Outdated
| ctx, cancel := context.WithCancel(ctx) | ||
| defer cancel() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT ctx is the main job context, which could be cancelled under a variety of circumstances (aggressive client shutdown, manual cancellation attempt via UI, etc). That would lead the case <- ctx.Done() below to exit even if the job is actually stuck.
Am I misunderstanding this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, on second thought, it makes sense to have a context.WithoutCancel(...) on that thing. Added, and put in a new test case that checks it's doing the right thing.
c85f4df to
9a9702c
Compare
Here, try to make some inroads on a feature we've been talking about for a while: detection of stuck jobs. Unfortunately in Go it's quite easy to accidentally park a job by using a `select` on a channel that won't return and forgetting a separate branch for `<-ctx.Done()` so that it won't respect job timeouts either. Here, add in some basic detection for that case. Eventually we'd like to give users some options for what to do in case jobs become stuck, but here we do only the simplest things for now: log when we detect a stuck job and count the number of stuck jobs in a producer's stats loop. In the future we may want to have some additional intelligence like having producers move stuck jobs to a separate bucket up to a certain limit before crashing (the next best option because it's not possible to manually kill goroutines).
9a9702c to
f03d348
Compare
|
@bgentry K, I think we should be up and running now. Mind taking another look? |
Here, try to make some inroads on a feature we've been talking about for
a while: detection of stuck jobs.
Unfortunately in Go it's quite easy to accidentally park a job by using
a
selecton a channel that won't return and forgetting a separatebranch for
<-ctx.Done()so that it won't respect job timeouts either.Here, add in some basic detection for that case. Eventually we'd like to
give users some options for what to do in case jobs become stuck, but
here we do only the simplest things for now: log when we detect a stuck
job and count the number of stuck jobs in a producer's stats loop.
In the future we may want to have some additional intelligence like
having producers move stuck jobs to a separate bucket up to a certain
limit before crashing (the next best option because it's not possible to
manually kill goroutines).