Skip to content

Commit 74f2aeb

Browse files
committed
Basic stuck job detection
Here, try to make some inroads on a feature we've been talking about for a while: detection of stuck jobs. Unfortunately in Go it's quite easy to accidentally park a job by using a `select` on a channel that won't return and forgetting a separate branch for `<-ctx.Done()` so that it won't respect job timeouts either. Here, add in some basic detection for that case. Eventually we'd like to give users some options for what to do in case jobs become stuck, but here we do only the simplest things for now: log when we detect a stuck job and count the number of stuck jobs in a producer's stats loop. In the future we may want to have some additional intelligence like having producers move stuck jobs to a separate bucket up to a certain limit before crashing (the next best option because it's not possible to manually kill goroutines).
1 parent dce66cd commit 74f2aeb

File tree

4 files changed

+120
-13
lines changed

4 files changed

+120
-13
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Basic stuck detection after a job's exceeded its timeout and still not returned after the executor's initiated context cancellation and waited a short margin for the cancellation to take effect. [PR #1097](https://github.com/riverqueue/river/pull/1097).
13+
1014
## [0.28.0] - 2025-11-23
1115

1216
### Added

internal/jobexecutor/job_executor.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,17 @@ type JobExecutor struct {
112112
ErrorHandler ErrorHandler
113113
HookLookupByJob *hooklookup.JobHookLookup
114114
HookLookupGlobal hooklookup.HookLookupInterface
115-
InformProducerDoneFunc func(jobRow *rivertype.JobRow)
116115
JobRow *rivertype.JobRow
117116
MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface
118-
SchedulerInterval time.Duration
119-
WorkerMiddleware []rivertype.WorkerMiddleware
120-
WorkUnit workunit.WorkUnit
117+
ProducerCallbacks struct {
118+
Done func(jobRow *rivertype.JobRow)
119+
Stuck func()
120+
Unstuck func()
121+
}
122+
SchedulerInterval time.Duration
123+
StuckThresholdOverride time.Duration
124+
WorkerMiddleware []rivertype.WorkerMiddleware
125+
WorkUnit workunit.WorkUnit
121126

122127
// Meant to be used from within the job executor only.
123128
start time.Time
@@ -159,7 +164,7 @@ func (e *JobExecutor) Execute(ctx context.Context) {
159164
}
160165
}
161166

162-
e.InformProducerDoneFunc(e.JobRow)
167+
e.ProducerCallbacks.Done(e.JobRow)
163168
}
164169

165170
// Executes the job, handling a panic if necessary (and various other error
@@ -171,6 +176,51 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
171176
metadataUpdates := make(map[string]any)
172177
ctx = context.WithValue(ctx, ContextKeyMetadataUpdates, metadataUpdates)
173178

179+
// Watches for jobs that may have become stuck. i.e. They've run longer than
180+
// their job timeout (plus a small margin) and don't appear to be responding
181+
// to context cancellation (unfortunately, quite an easy error to make in
182+
// Go).
183+
//
184+
// Currently we don't do anything if we notice a job is stuck. Knowing about
185+
// stuck jobs is just used for informational purposes in the producer in
186+
// generating periodic stats.
187+
if e.ClientJobTimeout > 0 {
188+
ctx, cancel := context.WithCancel(ctx)
189+
defer cancel()
190+
191+
go func() {
192+
const stuckThresholdDefault = 5 * time.Second
193+
194+
select {
195+
case <-ctx.Done():
196+
// cancellation or execution finished
197+
198+
case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)):
199+
e.ProducerCallbacks.Stuck()
200+
201+
e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",
202+
slog.Int64("job_id", e.JobRow.ID),
203+
slog.String("kind", e.JobRow.Kind),
204+
slog.Duration("timeout", e.ClientJobTimeout),
205+
)
206+
207+
// In case the executor ever becomes unstuck, inform the
208+
// producer. However, if we got all the way here there's a good
209+
// chance this will never happen (the worker is really stuck and
210+
// will never return).
211+
defer e.ProducerCallbacks.Unstuck()
212+
213+
defer func() {
214+
e.Logger.InfoContext(ctx, e.Name+": Job became unstuck",
215+
slog.Duration("duration", time.Since(e.start)),
216+
slog.Int64("job_id", e.JobRow.ID),
217+
slog.String("kind", e.JobRow.Kind),
218+
)
219+
}()
220+
}
221+
}()
222+
}
223+
174224
defer func() {
175225
if recovery := recover(); recovery != nil {
176226
e.Logger.ErrorContext(ctx, e.Name+": panic recovery; possible bug with Worker",

internal/jobexecutor/job_executor_test.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,19 @@ func TestJobExecutor_Execute(t *testing.T) {
191191
ErrorHandler: bundle.errorHandler,
192192
HookLookupByJob: hooklookup.NewJobHookLookup(),
193193
HookLookupGlobal: hooklookup.NewHookLookup(nil),
194-
InformProducerDoneFunc: func(job *rivertype.JobRow) {},
195194
JobRow: bundle.jobRow,
196195
MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil),
197-
SchedulerInterval: riverinternaltest.SchedulerShortInterval,
198-
WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow),
196+
ProducerCallbacks: struct {
197+
Done func(jobRow *rivertype.JobRow)
198+
Stuck func()
199+
Unstuck func()
200+
}{
201+
Done: func(jobRow *rivertype.JobRow) {},
202+
Stuck: func() {},
203+
Unstuck: func() {},
204+
},
205+
SchedulerInterval: riverinternaltest.SchedulerShortInterval,
206+
WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow),
199207
})
200208

201209
return executor, bundle
@@ -696,6 +704,36 @@ func TestJobExecutor_Execute(t *testing.T) {
696704
})
697705
})
698706

707+
t.Run("StuckDetection", func(t *testing.T) {
708+
t.Parallel()
709+
710+
executor, bundle := setup(t)
711+
712+
executor.ClientJobTimeout = 5 * time.Millisecond
713+
executor.StuckThresholdOverride = 1 * time.Nanosecond // must be greater than 0 to take effect
714+
715+
var (
716+
informProducerStuckReceived = make(chan struct{})
717+
informProducerUnstuckReceived = make(chan struct{})
718+
)
719+
executor.ProducerCallbacks.Stuck = func() {
720+
close(informProducerStuckReceived)
721+
}
722+
executor.ProducerCallbacks.Unstuck = func() {
723+
close(informProducerUnstuckReceived)
724+
}
725+
726+
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error {
727+
riversharedtest.WaitOrTimeout(t, informProducerStuckReceived)
728+
return nil
729+
}, nil).MakeUnit(bundle.jobRow)
730+
731+
executor.Execute(ctx)
732+
_ = riversharedtest.WaitOrTimeout(t, bundle.updateCh)
733+
734+
riversharedtest.WaitOrTimeout(t, informProducerUnstuckReceived)
735+
})
736+
699737
t.Run("Panic", func(t *testing.T) {
700738
t.Parallel()
701739

producer.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ type producer struct {
209209
// An atomic count of the number of jobs actively being worked on. This is
210210
// written to by the main goroutine, but read by the dispatcher.
211211
numJobsActive atomic.Int32
212+
numJobsStuck atomic.Int32
212213

213214
numJobsRan atomic.Uint64
214215
paused bool
@@ -771,20 +772,26 @@ func (p *producer) heartbeatLogLoop(ctx context.Context, wg *sync.WaitGroup) {
771772
ticker := time.NewTicker(5 * time.Second)
772773
defer ticker.Stop()
773774
type jobCount struct {
774-
ran uint64
775775
active int
776+
ran uint64
777+
stuck int
776778
}
777779
var prevCount jobCount
778780
for {
779781
select {
780782
case <-ctx.Done():
781783
return
782784
case <-ticker.C:
783-
curCount := jobCount{ran: p.numJobsRan.Load(), active: int(p.numJobsActive.Load())}
785+
curCount := jobCount{
786+
active: int(p.numJobsActive.Load()),
787+
ran: p.numJobsRan.Load(),
788+
stuck: int(p.numJobsStuck.Load()),
789+
}
784790
if curCount != prevCount {
785791
p.Logger.InfoContext(ctx, p.Name+": Producer job counts",
786792
slog.Uint64("num_completed_jobs", curCount.ran),
787793
slog.Int("num_jobs_running", curCount.active),
794+
slog.Int("num_jobs_stuck", curCount.stuck),
788795
slog.String("queue", p.config.Queue),
789796
)
790797
}
@@ -815,10 +822,18 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype.
815822
HookLookupByJob: p.config.HookLookupByJob,
816823
HookLookupGlobal: p.config.HookLookupGlobal,
817824
MiddlewareLookupGlobal: p.config.MiddlewareLookupGlobal,
818-
InformProducerDoneFunc: p.handleWorkerDone,
819825
JobRow: job,
820-
SchedulerInterval: p.config.SchedulerInterval,
821-
WorkUnit: workUnit,
826+
ProducerCallbacks: struct {
827+
Done func(jobRow *rivertype.JobRow)
828+
Stuck func()
829+
Unstuck func()
830+
}{
831+
Done: p.handleWorkerDone,
832+
Stuck: func() { p.numJobsStuck.Add(1) },
833+
Unstuck: func() { p.numJobsStuck.Add(-1) },
834+
},
835+
SchedulerInterval: p.config.SchedulerInterval,
836+
WorkUnit: workUnit,
822837
})
823838
p.addActiveJob(job.ID, executor)
824839

0 commit comments

Comments
 (0)