@@ -6,17 +6,19 @@ import (
66 "testing"
77 "time"
88
9+ "github.com/ovh/symmecrypt/ciphers/aesgcm"
10+ "github.com/ovh/symmecrypt/convergent"
11+ "github.com/rockbears/log"
12+ "github.com/stretchr/testify/require"
13+
14+ "github.com/ovh/cds/engine/cache"
915 "github.com/ovh/cds/engine/cdn/item"
1016 "github.com/ovh/cds/engine/cdn/lru"
1117 "github.com/ovh/cds/engine/cdn/storage"
1218 cdntest "github.com/ovh/cds/engine/cdn/test"
1319 "github.com/ovh/cds/engine/gorpmapper"
1420 "github.com/ovh/cds/engine/test"
1521 "github.com/ovh/cds/sdk"
16- "github.com/ovh/symmecrypt/ciphers/aesgcm"
17- "github.com/ovh/symmecrypt/convergent"
18- "github.com/rockbears/log"
19- "github.com/stretchr/testify/require"
2022)
2123
2224func TestCleanSynchronizedItem (t * testing.T ) {
@@ -503,3 +505,127 @@ func TestPurgeItem(t *testing.T) {
503505 require .NoError (t , err )
504506 require .Equal (t , 1 , len (items ))
505507}
508+
509+ func TestCleanSynchronizedReadingItem (t * testing.T ) {
510+ m := gorpmapper .New ()
511+ item .InitDBMapping (m )
512+ storage .InitDBMapping (m )
513+
514+ log .Factory = log .NewTestingWrapper (t )
515+ db , factory , store , cancel := test .SetupPGToCancel (t , m , sdk .TypeCDN )
516+ t .Cleanup (cancel )
517+
518+ cfg := test .LoadTestingConf (t , sdk .TypeCDN )
519+
520+ cdntest .ClearItem (t , context .TODO (), m , db )
521+ cdntest .ClearUnits (t , context .TODO (), m , db )
522+
523+ // Create cdn service
524+ s := Service {
525+ DBConnectionFactory : factory ,
526+ Cache : store ,
527+ Mapper : m ,
528+ }
529+ s .GoRoutines = sdk .NewGoRoutines (context .TODO ())
530+
531+ tmpDir , err := ioutil .TempDir ("" , t .Name ()+ "-cdn-1-*" )
532+ require .NoError (t , err )
533+
534+ tmpDir2 , err := ioutil .TempDir ("" , t .Name ()+ "-cdn-2-*" )
535+ require .NoError (t , err )
536+
537+ ctx , cancel := context .WithTimeout (context .TODO (), 5 * time .Second )
538+ t .Cleanup (cancel )
539+
540+ cdnUnits , err := storage .Init (ctx , m , store , db .DbMap , sdk .NewGoRoutines (ctx ), storage.Configuration {
541+ HashLocatorSalt : "thisismysalt" ,
542+ Buffers : map [string ]storage.BufferConfiguration {
543+ "redis_buffer" : {
544+ Redis : & storage.RedisBufferConfiguration {
545+ Host : cfg ["redisHost" ],
546+ Password : cfg ["redisPassword" ],
547+ },
548+ BufferType : storage .CDNBufferTypeLog ,
549+ },
550+ "file_buffer" : {
551+ Local : & storage.LocalBufferConfiguration {
552+ Path : tmpDir2 ,
553+ },
554+ BufferType : storage .CDNBufferTypeFile ,
555+ },
556+ },
557+ Storages : map [string ]storage.StorageConfiguration {
558+ "fs-backend" : {
559+ Local : & storage.LocalStorageConfiguration {
560+ Path : tmpDir ,
561+ Encryption : []convergent.ConvergentEncryptionConfig {
562+ {
563+ Cipher : aesgcm .CipherName ,
564+ LocatorSalt : "secret_locator_salt" ,
565+ SecretValue : "secret_value" ,
566+ },
567+ },
568+ },
569+ },
570+ "cds-backend" : {
571+ CDS : & storage.CDSStorageConfiguration {
572+ Host : "lolcat.host" ,
573+ Token : "mytoken" ,
574+ },
575+ },
576+ },
577+ })
578+ require .NoError (t , err )
579+ s .Units = cdnUnits
580+
581+ // Add Item in redis / fs/ cds -will be delete from redis
582+ it := sdk.CDNItem {
583+ ID : sdk .UUID (),
584+ Type : sdk .CDNTypeItemRunResult ,
585+ Status : sdk .CDNStatusItemCompleted ,
586+ APIRefHash : sdk .RandomString (10 ),
587+ }
588+ require .NoError (t , item .Insert (context .TODO (), s .Mapper , db , & it ))
589+ iuCDS := sdk.CDNItemUnit {UnitID : s .Units .Storages [1 ].ID (), ItemID : it .ID , Type : it .Type }
590+ require .NoError (t , storage .InsertItemUnit (context .TODO (), s .Mapper , db , & iuCDS ))
591+ iuFileBuf := sdk.CDNItemUnit {UnitID : s .Units .FileBuffer ().ID (), ItemID : it .ID , Type : it .Type }
592+ require .NoError (t , storage .InsertItemUnit (context .TODO (), s .Mapper , db , & iuFileBuf ))
593+ iuFileStorage := sdk.CDNItemUnit {UnitID : s .Units .Storages [0 ].ID (), ItemID : it .ID , Type : it .Type }
594+ require .NoError (t , storage .InsertItemUnit (context .TODO (), s .Mapper , db , & iuFileStorage ))
595+
596+ ///////////////////////////////////////
597+ // 1st test, getItem Lock the item unit
598+ ///////////////////////////////////////
599+ lockKey := cache .Key (storage .FileBufferKey , s .Units .FileBuffer ().ID (), "lock" , iuFileBuf .ID )
600+ hasLocked , err := s .Cache .Lock (lockKey , 5 * time .Second , 0 , 1 )
601+ require .NoError (t , err )
602+ t .Cleanup (func () {
603+ s .Cache .Unlock (lockKey )
604+ })
605+ require .True (t , hasLocked )
606+ require .NoError (t , s .cleanBuffer (context .TODO ()))
607+
608+ _ , err = storage .LoadItemUnitByID (ctx , m , db , iuFileBuf .ID )
609+ require .NoError (t , err )
610+
611+ require .NoError (t , s .Cache .Unlock (lockKey ))
612+
613+ ////////////////////////////////////////////////////////
614+ // 2nd test, getItem is reading the file from the buffer
615+ ////////////////////////////////////////////////////////
616+ readerKey := cache .Key (storage .FileBufferKey , s .Units .FileBuffer ().ID (), "reader" , iuFileBuf .ID , sdk .UUID ())
617+ require .NoError (t , s .Cache .SetWithTTL (readerKey , true , 30 ))
618+ require .NoError (t , s .cleanBuffer (context .TODO ()))
619+
620+ _ , err = storage .LoadItemUnitByID (ctx , m , db , iuFileBuf .ID )
621+ require .NoError (t , err )
622+
623+ require .NoError (t , s .Cache .Delete (readerKey ))
624+ ////////////////////////////////////////////////////////
625+ // 3rd test, mark as delete
626+ ////////////////////////////////////////////////////////
627+ require .NoError (t , s .cleanBuffer (context .TODO ()))
628+
629+ _ , err = storage .LoadItemUnitByID (ctx , m , db , iuFileBuf .ID )
630+ require .True (t , sdk .ErrorIs (err , sdk .ErrNotFound ))
631+ }
0 commit comments