66 "time"
77
88 "github.com/fujiwara/shapeio"
9+ "github.com/go-gorp/gorp"
910 "github.com/rockbears/log"
1011
1112 "github.com/ovh/cds/engine/cache"
@@ -77,15 +78,27 @@ func (x *RunningStorageUnits) FillWithUnknownItems(ctx context.Context, s Storag
7778 return nil
7879}
7980
80- func (x * RunningStorageUnits ) processItem (ctx context.Context , tx gorpmapper.SqlExecutorWithTx , s StorageUnit , id string ) error {
81- it , err := item .LoadAndLockByID (ctx , x .m , tx , id , gorpmapper .GetOptions .WithDecryption )
81+ func (x * RunningStorageUnits ) processItem (ctx context.Context , db * gorp.DbMap , s StorageUnit , id string ) error {
82+ lockKey := cache .Key ("cdn" , "sync" , "item" , id )
83+ hasLock , err := x .cache .Lock (lockKey , 20 * time .Minute , 0 , 1 )
84+ if err != nil {
85+ log .Error (ctx , "unable to get lock %s: %v" , lockKey , err )
86+ }
87+ if ! hasLock {
88+ return nil
89+ }
90+ defer func () {
91+ _ = x .cache .Unlock (lockKey )
92+ }()
93+
94+ it , err := item .LoadByID (ctx , x .m , db , id , gorpmapper .GetOptions .WithDecryption )
8295 if err != nil {
8396 return err
8497 }
8598 ctx = context .WithValue (ctx , FieldAPIRef , it .APIRefHash )
8699 ctx = context .WithValue (ctx , FieldSize , it .Size )
87100 log .Info (ctx , "processing item %s on %s" , it .ID , s .Name ())
88- if _ , err = LoadItemUnitByUnit (ctx , x .m , tx , s .ID (), id ); err == nil {
101+ if _ , err = LoadItemUnitByUnit (ctx , x .m , db , s .ID (), id ); err == nil {
89102 log .Info (ctx , "Item %s already sync on %s" , id , s .Name ())
90103 return nil
91104
@@ -94,31 +107,20 @@ func (x *RunningStorageUnits) processItem(ctx context.Context, tx gorpmapper.Sql
94107 return err
95108 }
96109
97- if err := x .runItem (ctx , tx , s , it ); err != nil {
110+ if err := x .runItem (ctx , db , s , it ); err != nil {
98111 return err
99112 }
100-
113+ x . RemoveFromRedisSyncQueue ( ctx , s , id )
101114 return nil
102115}
103116
104- func (x * RunningStorageUnits ) runItem (ctx context.Context , tx gorpmapper. SqlExecutorWithTx , dest StorageUnit , item * sdk.CDNItem ) error {
117+ func (x * RunningStorageUnits ) runItem (ctx context.Context , db * gorp. DbMap , dest StorageUnit , item * sdk.CDNItem ) error {
105118 iu , err := x .NewItemUnit (ctx , dest , item )
106119 if err != nil {
107120 return err
108121 }
109122 iu .Item = item
110123
111- // Save in database that the item is complete for the storage unit
112- if err := InsertItemUnit (ctx , x .m , tx , iu ); err != nil {
113- return err
114- }
115-
116- // Reload the item unit
117- iu , err = LoadItemUnitByID (ctx , x .m , tx , iu .ID , gorpmapper .GetOptions .WithDecryption )
118- if err != nil {
119- return err
120- }
121-
122124 // Check if the content (based on the locator) is already known from the destination unit
123125 has , err := x .GetItemUnitByLocatorByUnit (iu .Locator , dest .ID (), iu .Type )
124126 if err != nil {
@@ -199,7 +201,17 @@ func (x *RunningStorageUnits) runItem(ctx context.Context, tx gorpmapper.SqlExec
199201 }
200202
201203 log .Info (ctx , "item %s has been pushed to %s (%.3f s)" , item .ID , dest .Name (), t2 .Sub (t1 ).Seconds ())
202- return nil
204+
205+ tx , err := db .Begin ()
206+ if err != nil {
207+ return sdk .WrapError (err , "unable to start transaction" )
208+ }
209+ defer tx .Rollback () //nolint
210+ // Save in database that the item is complete for the storage unit
211+ if err := InsertItemUnit (ctx , x .m , tx , iu ); err != nil {
212+ return err
213+ }
214+ return sdk .WrapError (tx .Commit (), "unable to commit tx" )
203215}
204216
205217func (x * RunningStorageUnits ) NewItemUnit (_ context.Context , su Interface , i * sdk.CDNItem ) (* sdk.CDNItemUnit , error ) {
0 commit comments