@@ -17,28 +17,28 @@ func (s *Service) runScheduler(c context.Context) error {
1717
1818 go func () {
1919 if err := s .dequeueTaskExecutions (ctx ); err != nil {
20- log .Error (ctx , "Hooks> runScheduler> dequeueLongRunningTasks> %v" , err )
20+ log .Error (ctx , "runScheduler> dequeueLongRunningTasks> %v" , err )
2121 cancel ()
2222 }
2323 }()
2424
2525 go func () {
2626 if err := s .retryTaskExecutionsRoutine (ctx ); err != nil {
27- log .Error (ctx , "Hooks> runScheduler> retryTaskExecutionsRoutine> %v" , err )
27+ log .Error (ctx , "runScheduler> retryTaskExecutionsRoutine> %v" , err )
2828 cancel ()
2929 }
3030 }()
3131
3232 go func () {
3333 if err := s .enqueueScheduledTaskExecutionsRoutine (ctx ); err != nil {
34- log .Error (ctx , "Hooks> runScheduler> enqueueScheduledTaskExecutionsRoutine> %v" , err )
34+ log .Error (ctx , "runScheduler> enqueueScheduledTaskExecutionsRoutine> %v" , err )
3535 cancel ()
3636 }
3737 }()
3838
3939 go func () {
4040 if err := s .deleteTaskExecutionsRoutine (ctx ); err != nil {
41- log .Error (ctx , "Hooks> runScheduler> deleteTaskExecutionsRoutine> %v" , err )
41+ log .Error (ctx , "runScheduler> deleteTaskExecutionsRoutine> %v" , err )
4242 cancel ()
4343 }
4444 }()
@@ -58,29 +58,29 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
5858 case <- tick .C :
5959 size , err := s .Dao .QueueLen ()
6060 if err != nil {
61- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > Unable to get queueLen: %v" , err )
61+ log .Error (ctx , "retryTaskExecutionsRoutine > Unable to get queueLen: %v" , err )
6262 continue
6363 }
6464 if size > 20 {
65- log .Warning (ctx , "Hooks> too many tasks in scheduler for now, skipped this retry ticker. size:%d" , size )
65+ log .Warning (ctx , "too many tasks in scheduler for now, skipped this retry ticker. size:%d" , size )
6666 continue
6767 }
6868
6969 if s .Maintenance {
70- log .Info (ctx , "Hooks> retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d" , size )
70+ log .Info (ctx , "retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d" , size )
7171 time .Sleep (1 * time .Minute )
7272 continue
7373 }
7474
7575 tasks , err := s .Dao .FindAllTasks (ctx )
7676 if err != nil {
77- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
77+ log .Error (ctx , "retryTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
7878 continue
7979 }
8080 for _ , t := range tasks {
8181 execs , err := s .Dao .FindAllTaskExecutions (ctx , & t )
8282 if err != nil {
83- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
83+ log .Error (ctx , "retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
8484 continue
8585 }
8686 for _ , e := range execs {
@@ -91,37 +91,37 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
9191 // old hooks
9292 if e .ProcessingTimestamp == 0 && e .Timestamp < time .Now ().Add (- 2 * time .Minute ).UnixNano () {
9393 if e .UUID == "" {
94- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v" , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , e .Timestamp , e .LastError )
94+ log .Warning (ctx , "retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v" , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , e .Timestamp , e .LastError )
9595 continue
9696 }
9797 e .Status = TaskExecutionEnqueued
9898 if err := s .Dao .SaveTaskExecution (& e ); err != nil {
99- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v" , e .UUID , err )
99+ log .Warning (ctx , "retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v" , e .UUID , err )
100100 continue
101101 }
102- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , e .Timestamp , e .LastError )
102+ log .Warning (ctx , "retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , e .Timestamp , e .LastError )
103103 if err := s .Dao .EnqueueTaskExecution (ctx , & e ); err != nil {
104- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
104+ log .Error (ctx , "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
105105 }
106106 }
107107 if e .NbErrors < s .Cfg .RetryError && e .LastError != "" {
108108 // avoid re-enqueue if the lastError is about a git branch not found
109109 // the branch was deleted from git repository, it will never work
110110 if strings .Contains (e .LastError , "branchName parameter must be provided" ) {
111- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , len (e .LastError ), e .LastError )
111+ log .Warning (ctx , "retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , len (e .LastError ), e .LastError )
112112 if err := s .Dao .DeleteTaskExecution (& e ); err != nil {
113- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
113+ log .Error (ctx , "retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
114114 }
115115 continue
116116 }
117117 e .Status = TaskExecutionEnqueued
118118 if err := s .Dao .SaveTaskExecution (& e ); err != nil {
119- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine> unable to save task execution for %s: %v" , e .UUID , err )
119+ log .Warning (ctx , "retryTaskExecutionsRoutine> unable to save task execution for %s: %v" , e .UUID , err )
120120 continue
121121 }
122- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , len (e .LastError ), e .LastError )
122+ log .Warning (ctx , "retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , len (e .LastError ), e .LastError )
123123 if err := s .Dao .EnqueueTaskExecution (ctx , & e ); err != nil {
124- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
124+ log .Error (ctx , "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
125125 }
126126 continue
127127 }
@@ -138,17 +138,18 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
138138 for {
139139 select {
140140 case <- ctx .Done ():
141+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > exiting goroutine: %v" , ctx .Err ())
141142 return ctx .Err ()
142143 case <- tick .C :
143144 tasks , err := s .Dao .FindAllTasks (ctx )
144145 if err != nil {
145- log .Error (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
146+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
146147 continue
147148 }
148149 for _ , t := range tasks {
149150 execs , err := s .Dao .FindAllTaskExecutions (ctx , & t )
150151 if err != nil {
151- log .Error (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
152+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
152153 continue
153154 }
154155 alreadyEnqueued := false
@@ -157,16 +158,16 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
157158 // update status before enqueue
158159 // this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
159160 if alreadyEnqueued {
160- log .Info (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it" , e .UUID , e .Type )
161+ log .Info (ctx , "enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it" , e .UUID , e .Type )
161162 if err := s .Dao .DeleteTaskExecution (& e ); err != nil {
162- log .Error (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
163+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
163164 }
164165 } else {
165166 e .Status = TaskExecutionEnqueued
166167 s .Dao .SaveTaskExecution (& e )
167- log .Info (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d" , e .Type , e .UUID , e .Timestamp )
168+ log .Info (ctx , "enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d" , e .Type , e .UUID , e .Timestamp )
168169 if err := s .Dao .EnqueueTaskExecution (ctx , & e ); err != nil {
169- log .Error (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
170+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
170171 }
171172 // this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
172173 if e .Type == TypeRepoPoller || e .Type == TypeScheduler {
@@ -193,14 +194,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
193194 case <- tick .C :
194195 tasks , err := s .Dao .FindAllTasks (ctx )
195196 if err != nil {
196- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
197+ log .Error (ctx , "deleteTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
197198 continue
198199 }
199200 for _ , t := range tasks {
200201 taskToDelete := false
201202 execs , err := s .Dao .FindAllTaskExecutions (ctx , & t )
202203 if err != nil {
203- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
204+ log .Error (ctx , "deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
204205 continue
205206 }
206207 sort .Slice (execs , func (i , j int ) bool {
@@ -213,14 +214,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
213214 case TypeBranchDeletion :
214215 if e .Status == TaskExecutionDone && e .ProcessingTimestamp != 0 {
215216 if err := s .Dao .DeleteTaskExecution (& e ); err != nil {
216- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
217+ log .Error (ctx , "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
217218 }
218219 taskToDelete = true
219220 }
220221 default :
221222 if i >= s .Cfg .ExecutionHistory && e .ProcessingTimestamp != 0 {
222223 if err := s .Dao .DeleteTaskExecution (& e ); err != nil {
223- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
224+ log .Error (ctx , "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
224225 }
225226 }
226227 }
@@ -229,7 +230,7 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
229230
230231 if taskToDelete {
231232 if err := s .deleteTask (ctx , & t ); err != nil {
232- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v" , t .UUID , err )
233+ log .Error (ctx , "deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v" , t .UUID , err )
233234 }
234235 }
235236 }
@@ -241,14 +242,15 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
241242func (s * Service ) dequeueTaskExecutions (ctx context.Context ) error {
242243 for {
243244 if ctx .Err () != nil {
245+ log .Error (ctx , "dequeueTaskExecutions> exiting go routine: %v" , ctx .Err ())
244246 return ctx .Err ()
245247 }
246248 size , err := s .Dao .QueueLen ()
247249 if err != nil {
248- log .Error (ctx , "Hooks> dequeueTaskExecutions > Unable to get queueLen: %v" , err )
250+ log .Error (ctx , "dequeueTaskExecutions > Unable to get queueLen: %v" , err )
249251 continue
250252 }
251- log .Debug ("Hooks> dequeueTaskExecutions> current queue size: %d" , size )
253+ log .Debug ("dequeueTaskExecutions> current queue size: %d" , size )
252254
253255 if s .Maintenance {
254256 log .Info (ctx , "Maintenance enable, wait 1 minute. Queue %d" , size )
@@ -259,13 +261,15 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
259261 // Dequeuing context
260262 var taskKey string
261263 if ctx .Err () != nil {
264+ log .Error (ctx , "dequeueTaskExecutions> exiting go routine: %v" , err )
262265 return ctx .Err ()
263266 }
264267 if err := s .Cache .DequeueWithContext (ctx , schedulerQueueKey , & taskKey ); err != nil {
265- log .Error (ctx , "Hooks> dequeueTaskExecutions> store.DequeueWithContext err: %v" , err )
268+ log .Error (ctx , "dequeueTaskExecutions> store.DequeueWithContext err: %v" , err )
266269 continue
267270 }
268- log .Debug ("Hooks> dequeueTaskExecutions> work on taskKey: %s" , taskKey )
271+ s .Dao .dequeuedIncr ()
272+ log .Info (ctx , "dequeueTaskExecutions> work on taskKey: %s" , taskKey )
269273
270274 // Load the task execution
271275 var t = sdk.TaskExecution {}
@@ -286,18 +290,18 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
286290
287291 task := s .Dao .FindTask (ctx , t .UUID )
288292 if task == nil {
289- log .Error (ctx , "Hooks> dequeueTaskExecutions failed: Task %s not found - deleting this task execution" , t .UUID )
293+ log .Error (ctx , "dequeueTaskExecutions failed: Task %s not found - deleting this task execution" , t .UUID )
290294 t .LastError = "Internal Error: Task not found"
291295 t .NbErrors ++
292296 if err := s .Dao .DeleteTaskExecution (& t ); err != nil {
293- log .Error (ctx , "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
297+ log .Error (ctx , "dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
294298 }
295299 continue
296300
297301 } else if t .NbErrors >= s .Cfg .RetryError {
298- log .Info (ctx , "Hooks> dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s" , t .UUID , t .NbErrors , t .LastError )
302+ log .Info (ctx , "dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s" , t .UUID , t .NbErrors , t .LastError )
299303 if err := s .Dao .DeleteTaskExecution (& t ); err != nil {
300- log .Error (ctx , "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
304+ log .Error (ctx , "dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
301305 }
302306 continue
303307
@@ -307,19 +311,19 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
307311 saveTaskExecution = true
308312 } else {
309313 saveTaskExecution = true
310- log .Debug ("Hooks> dequeueTaskExecutions> call doTask on taskKey: %s" , taskKey )
314+ log .Debug ("dequeueTaskExecutions> call doTask on taskKey: %s" , taskKey )
311315 var err error
312316 restartTask , err = s .doTask (ctx , task , & t )
313317 if err != nil {
314318 if strings .Contains (err .Error (), "Unsupported task type" ) {
315319 // delete this task execution, as it will never work
316- log .Info (ctx , "Hooks> dequeueTaskExecutions> Deleting task execution %s as err:%v" , t .UUID , err )
320+ log .Info (ctx , "dequeueTaskExecutions> Deleting task execution %s as err:%v" , t .UUID , err )
317321 if err := s .Dao .DeleteTaskExecution (& t ); err != nil {
318- log .Error (ctx , "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
322+ log .Error (ctx , "dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
319323 }
320324 continue
321325 } else {
322- log .Error (ctx , "Hooks> dequeueTaskExecutions> %s failed err[%d]: %v" , t .UUID , t .NbErrors , err )
326+ log .Error (ctx , "dequeueTaskExecutions> %s failed err[%d]: %v" , t .UUID , t .NbErrors , err )
323327 t .LastError = err .Error ()
324328 t .NbErrors ++
325329 saveTaskExecution = true
@@ -336,7 +340,10 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
336340
337341 //Start (or restart) the task
338342 if restartTask {
339- _ , _ = s .startTask (ctx , task )
343+ _ , err := s .startTask (ctx , task )
344+ if err != nil {
345+ log .Error (ctx , "dequeueTaskExecutions> unable to restart the task %+v after execution: %v" , task , err )
346+ }
340347 }
341348 }
342349}
0 commit comments