@@ -413,7 +413,7 @@ func LoadRunResultsByRunIDAndType(ctx context.Context, db gorp.SqlExecutor, runI
413413 return getAll (ctx , db , query )
414414}
415415
416- func ResyncWorkflowRunResultsRoutine (ctx context.Context , DBFunc func () * gorp.DbMap , delay time.Duration ) {
416+ func ResyncWorkflowRunResultsRoutine (ctx context.Context , DBFunc func () * gorp.DbMap , store cache. Store , delay time.Duration ) {
417417 tick := time .NewTicker (delay )
418418 defer tick .Stop ()
419419
@@ -433,21 +433,35 @@ func ResyncWorkflowRunResultsRoutine(ctx context.Context, DBFunc func() *gorp.Db
433433 continue
434434 }
435435 for _ , id := range ids {
436+ lockKey := cache .Key ("api:resyncWorkflowRunResults" , fmt .Sprintf ("%d" , id ))
437+ b , err := store .Lock (lockKey , 5 * time .Minute , 0 , 1 )
438+ if err != nil {
439+ log .ErrorWithStackTrace (ctx , err )
440+ continue
441+ }
442+ if ! b {
443+ log .Debug (ctx , "api.resyncWorkflowRunResults> workflow run %d is locked in cache" , id )
444+ continue
445+ }
436446 tx , err := DBFunc ().Begin ()
437447 if err != nil {
438448 log .ErrorWithStackTrace (ctx , sdk .WithStack (err ))
449+ _ = store .Unlock (lockKey )
439450 continue
440451 }
441452 if err := SyncRunResultArtifactManagerByRunID (ctx , tx , id ); err != nil {
442453 log .ErrorWithStackTrace (ctx , err )
443454 tx .Rollback ()
455+ _ = store .Unlock (lockKey )
444456 continue
445457 }
446458 if err := tx .Commit (); err != nil {
447459 log .ErrorWithStackTrace (ctx , sdk .WithStack (err ))
448460 tx .Rollback ()
461+ _ = store .Unlock (lockKey )
449462 continue
450463 }
464+ _ = store .Unlock (lockKey )
451465 }
452466 }
453467 }
0 commit comments