@@ -134,7 +134,6 @@ func TestJobRescuer(t *testing.T) {
134
134
135
135
t .Run ("RescuesStuckJobs" , func (t * testing.T ) {
136
136
t .Parallel ()
137
- require := require .New (t )
138
137
139
138
rescuer , bundle := setup (t )
140
139
@@ -163,63 +162,69 @@ func TestJobRescuer(t *testing.T) {
163
162
longTimeOutJob1 := testfactory .Job (ctx , t , bundle .exec , & testfactory.JobOpts {Kind : ptrutil .Ptr (rescuerJobKindLongTimeout ), State : ptrutil .Ptr (rivertype .JobStateRunning ), AttemptedAt : ptrutil .Ptr (bundle .rescueHorizon .Add (- 1 * time .Minute )), MaxAttempts : ptrutil .Ptr (5 )})
164
163
longTimeOutJob2 := testfactory .Job (ctx , t , bundle .exec , & testfactory.JobOpts {Kind : ptrutil .Ptr (rescuerJobKindLongTimeout ), State : ptrutil .Ptr (rivertype .JobStateRunning ), AttemptedAt : ptrutil .Ptr (bundle .rescueHorizon .Add (- 6 * time .Minute )), MaxAttempts : ptrutil .Ptr (5 )})
165
164
166
- require .NoError (rescuer .Start (ctx ))
165
+ require .NoError (t , rescuer .Start (ctx ))
167
166
168
167
rescuer .TestSignals .FetchedBatch .WaitOrTimeout ()
169
168
rescuer .TestSignals .UpdatedBatch .WaitOrTimeout ()
170
169
171
170
confirmRetried := func (jobBefore * rivertype.JobRow ) {
172
171
jobAfter , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : jobBefore .ID , Schema : rescuer .Config .Schema })
173
- require .NoError (err )
174
- require .Equal (rivertype .JobStateRetryable , jobAfter .State )
172
+ require .NoError (t , err )
173
+ require .Equal (t , rivertype .JobStateRetryable , jobAfter .State )
174
+
175
+ require .Len (t , jobAfter .Errors , 1 )
176
+ attemptError := jobAfter .Errors [0 ]
177
+ require .Zero (t , attemptError .Attempt )
178
+ require .Equal (t , "Stuck job rescued by JobRescuer" , attemptError .Error )
179
+ require .Empty (t , attemptError .Trace )
175
180
}
176
181
177
182
var err error
178
183
confirmRetried (stuckToRetryJob1 )
179
184
confirmRetried (stuckToRetryJob2 )
180
185
181
186
job3After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : stuckToRetryJob3 .ID , Schema : rescuer .Config .Schema })
182
- require .NoError (err )
183
- require .Equal (stuckToRetryJob3 .State , job3After .State ) // not rescued
187
+ require .NoError (t , err )
188
+ require .Equal (t , stuckToRetryJob3 .State , job3After .State ) // not rescued
184
189
185
190
discardJob1After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : stuckToDiscardJob1 .ID , Schema : rescuer .Config .Schema })
186
- require .NoError (err )
187
- require .Equal (rivertype .JobStateDiscarded , discardJob1After .State )
188
- require .WithinDuration (time .Now (), * discardJob1After .FinalizedAt , 5 * time .Second )
189
- require .Len (discardJob1After .Errors , 1 )
191
+ require .NoError (t , err )
192
+ require .Equal (t , rivertype .JobStateDiscarded , discardJob1After .State )
193
+ require .WithinDuration (t , time .Now (), * discardJob1After .FinalizedAt , 5 * time .Second )
194
+ require .Len (t , discardJob1After .Errors , 1 )
190
195
191
196
discardJob2After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : stuckToDiscardJob2 .ID , Schema : rescuer .Config .Schema })
192
- require .NoError (err )
193
- require .Equal (rivertype .JobStateRunning , discardJob2After .State )
194
- require .Nil (discardJob2After .FinalizedAt )
197
+ require .NoError (t , err )
198
+ require .Equal (t , rivertype .JobStateRunning , discardJob2After .State )
199
+ require .Nil (t , discardJob2After .FinalizedAt )
195
200
196
201
cancelJob1After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : stuckToCancelJob1 .ID , Schema : rescuer .Config .Schema })
197
- require .NoError (err )
198
- require .Equal (rivertype .JobStateCancelled , cancelJob1After .State )
199
- require .WithinDuration (time .Now (), * cancelJob1After .FinalizedAt , 5 * time .Second )
200
- require .Len (cancelJob1After .Errors , 1 )
202
+ require .NoError (t , err )
203
+ require .Equal (t , rivertype .JobStateCancelled , cancelJob1After .State )
204
+ require .WithinDuration (t , time .Now (), * cancelJob1After .FinalizedAt , 5 * time .Second )
205
+ require .Len (t , cancelJob1After .Errors , 1 )
201
206
202
207
cancelJob2After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : stuckToCancelJob2 .ID , Schema : rescuer .Config .Schema })
203
- require .NoError (err )
204
- require .Equal (rivertype .JobStateRunning , cancelJob2After .State )
205
- require .Nil (cancelJob2After .FinalizedAt )
208
+ require .NoError (t , err )
209
+ require .Equal (t , rivertype .JobStateRunning , cancelJob2After .State )
210
+ require .Nil (t , cancelJob2After .FinalizedAt )
206
211
207
212
notRunningJob1After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : notRunningJob1 .ID , Schema : rescuer .Config .Schema })
208
- require .NoError (err )
209
- require .Equal (notRunningJob1 .State , notRunningJob1After .State )
213
+ require .NoError (t , err )
214
+ require .Equal (t , notRunningJob1 .State , notRunningJob1After .State )
210
215
notRunningJob2After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : notRunningJob2 .ID , Schema : rescuer .Config .Schema })
211
- require .NoError (err )
212
- require .Equal (notRunningJob2 .State , notRunningJob2After .State )
216
+ require .NoError (t , err )
217
+ require .Equal (t , notRunningJob2 .State , notRunningJob2After .State )
213
218
notRunningJob3After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : notRunningJob3 .ID , Schema : rescuer .Config .Schema })
214
- require .NoError (err )
215
- require .Equal (notRunningJob3 .State , notRunningJob3After .State )
219
+ require .NoError (t , err )
220
+ require .Equal (t , notRunningJob3 .State , notRunningJob3After .State )
216
221
217
222
notTimedOutJob1After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : longTimeOutJob1 .ID , Schema : rescuer .Config .Schema })
218
- require .NoError (err )
219
- require .Equal (rivertype .JobStateRunning , notTimedOutJob1After .State )
223
+ require .NoError (t , err )
224
+ require .Equal (t , rivertype .JobStateRunning , notTimedOutJob1After .State )
220
225
notTimedOutJob2After , err := bundle .exec .JobGetByID (ctx , & riverdriver.JobGetByIDParams {ID : longTimeOutJob2 .ID , Schema : rescuer .Config .Schema })
221
- require .NoError (err )
222
- require .Equal (rivertype .JobStateRetryable , notTimedOutJob2After .State )
226
+ require .NoError (t , err )
227
+ require .Equal (t , rivertype .JobStateRetryable , notTimedOutJob2After .State )
223
228
})
224
229
225
230
t .Run ("RescuesInBatches" , func (t * testing.T ) {
0 commit comments