@@ -519,7 +519,7 @@ func (sf *file) GetPassthroughFd() (uintptr, error) {
519
519
// cache.PassThrough() is necessary to take over files
520
520
r , err := sf .gr .cache .Get (id , cache .PassThrough ())
521
521
if err != nil {
522
- if id , err = sf .prefetchEntireFile (); err != nil {
522
+ if err : = sf .prefetchEntireFile (id ); err != nil {
523
523
return 0 , err
524
524
}
525
525
@@ -542,15 +542,12 @@ func (sf *file) GetPassthroughFd() (uintptr, error) {
542
542
return fd , nil
543
543
}
544
544
545
- func (sf * file ) prefetchEntireFile () ( string , error ) {
545
+ func (sf * file ) prefetchEntireFile (entireCacheID string ) error {
546
546
var (
547
547
offset int64
548
548
firstChunkOffset int64 = - 1
549
549
totalSize int64
550
550
)
551
- combinedBuffer := sf .gr .bufPool .Get ().(* bytes.Buffer )
552
- combinedBuffer .Reset ()
553
- defer sf .gr .putBuffer (combinedBuffer )
554
551
555
552
for {
556
553
chunkOffset , chunkSize , chunkDigestStr , ok := sf .fr .ChunkEntryForOffset (offset )
@@ -572,7 +569,11 @@ func (sf *file) prefetchEntireFile() (string, error) {
572
569
if r , err := sf .gr .cache .Get (id ); err == nil {
573
570
n , err := r .ReadAt (ip , 0 )
574
571
if (err == nil || err == io .EOF ) && int64 (n ) == chunkSize {
575
- combinedBuffer .Write (ip [:n ])
572
+ if err := sf .gr .appendData (ip [:n ], entireCacheID ); err != nil {
573
+ r .Close ()
574
+ sf .gr .putBuffer (b )
575
+ return fmt .Errorf ("failed to append cached data: %w" , err )
576
+ }
576
577
totalSize += int64 (n )
577
578
offset = chunkOffset + int64 (n )
578
579
r .Close ()
@@ -585,21 +586,21 @@ func (sf *file) prefetchEntireFile() (string, error) {
585
586
// cache miss, prefetch the whole chunk
586
587
if _ , err := sf .fr .ReadAt (ip , chunkOffset ); err != nil && err != io .EOF {
587
588
sf .gr .putBuffer (b )
588
- return "" , fmt .Errorf ("failed to read data: %w" , err )
589
+ return fmt .Errorf ("failed to read data: %w" , err )
589
590
}
590
591
if err := sf .gr .verifyOneChunk (sf .id , ip , chunkDigestStr ); err != nil {
591
592
sf .gr .putBuffer (b )
592
- return "" , err
593
+ return err
594
+ }
595
+ if err := sf .gr .appendData (ip , entireCacheID ); err != nil {
596
+ sf .gr .putBuffer (b )
597
+ return fmt .Errorf ("failed to append fetched data: %w" , err )
593
598
}
594
- combinedBuffer .Write (ip )
595
599
totalSize += chunkSize
596
600
offset = chunkOffset + chunkSize
597
601
sf .gr .putBuffer (b )
598
602
}
599
- combinedIP := combinedBuffer .Bytes ()
600
- combinedID := genID (sf .id , firstChunkOffset , totalSize )
601
- sf .gr .cacheData (combinedIP , combinedID )
602
- return combinedID , nil
603
+ return nil
603
604
}
604
605
605
606
func (gr * reader ) verifyOneChunk (entryID uint32 , ip []byte , chunkDigestStr string ) error {
@@ -624,6 +625,31 @@ func (gr *reader) cacheData(ip []byte, cacheID string) {
624
625
}
625
626
}
626
627
628
+ func (gr * reader ) appendData (ip []byte , cacheID string ) error {
629
+ r , err := gr .cache .Get (cacheID )
630
+ if err != nil {
631
+ gr .cacheData (ip , cacheID )
632
+ return nil
633
+ }
634
+ defer r .Close ()
635
+
636
+ readerAt := r .GetReaderAt ()
637
+ file , ok := readerAt .(* os.File )
638
+ if ! ok {
639
+ return fmt .Errorf ("the cached ReaderAt is not of type *os.File" )
640
+ }
641
+
642
+ if _ , err := file .Seek (0 , io .SeekEnd ); err != nil {
643
+ return fmt .Errorf ("failed to seek to end of file: %w" , err )
644
+ }
645
+
646
+ if _ , err := file .Write (ip ); err != nil {
647
+ return fmt .Errorf ("failed to append data to file: %w" , err )
648
+ }
649
+
650
+ return nil
651
+ }
652
+
627
653
func (gr * reader ) verifyAndCache (entryID uint32 , ip []byte , chunkDigestStr string , cacheID string ) error {
628
654
if err := gr .verifyOneChunk (entryID , ip , chunkDigestStr ); err != nil {
629
655
return err
0 commit comments