Skip to content

Commit 3d96b86

Browse files
committed
fuse passthrough: fix oom when running huge images
Signed-off-by: abushwang <[email protected]>
1 parent 928a4dd commit 3d96b86

File tree

1 file changed

+42
-13
lines changed

1 file changed

+42
-13
lines changed

fs/reader/reader.go

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ func (sf *file) GetPassthroughFd() (uintptr, error) {
519519
// cache.PassThrough() is necessary to take over files
520520
r, err := sf.gr.cache.Get(id, cache.PassThrough())
521521
if err != nil {
522-
if id, err = sf.prefetchEntireFile(); err != nil {
522+
if err := sf.prefetchEntireFile(id); err != nil {
523523
return 0, err
524524
}
525525

@@ -542,15 +542,24 @@ func (sf *file) GetPassthroughFd() (uintptr, error) {
542542
return fd, nil
543543
}
544544

545-
func (sf *file) prefetchEntireFile() (string, error) {
545+
func (sf *file) prefetchEntireFile(entireCacheID string) error {
546546
var (
547547
offset int64
548548
firstChunkOffset int64 = -1
549549
totalSize int64
550550
)
551-
combinedBuffer := sf.gr.bufPool.Get().(*bytes.Buffer)
552-
combinedBuffer.Reset()
553-
defer sf.gr.putBuffer(combinedBuffer)
551+
552+
w, err := sf.gr.cache.Add(entireCacheID)
553+
if err != nil {
554+
return fmt.Errorf("failed to create cache writer: %w", err)
555+
}
556+
defer w.Close()
557+
558+
seeker, ok := w.(io.Seeker)
559+
if !ok {
560+
w.Abort()
561+
return fmt.Errorf("writer does not support seeking")
562+
}
554563

555564
for {
556565
chunkOffset, chunkSize, chunkDigestStr, ok := sf.fr.ChunkEntryForOffset(offset)
@@ -572,7 +581,18 @@ func (sf *file) prefetchEntireFile() (string, error) {
572581
if r, err := sf.gr.cache.Get(id); err == nil {
573582
n, err := r.ReadAt(ip, 0)
574583
if (err == nil || err == io.EOF) && int64(n) == chunkSize {
575-
combinedBuffer.Write(ip[:n])
584+
if _, err := seeker.Seek(totalSize, io.SeekStart); err != nil {
585+
r.Close()
586+
sf.gr.putBuffer(b)
587+
w.Abort()
588+
return fmt.Errorf("failed to seek: %w", err)
589+
}
590+
if _, err := w.Write(ip[:n]); err != nil {
591+
r.Close()
592+
sf.gr.putBuffer(b)
593+
w.Abort()
594+
return fmt.Errorf("failed to write cached data: %w", err)
595+
}
576596
totalSize += int64(n)
577597
offset = chunkOffset + int64(n)
578598
r.Close()
@@ -585,21 +605,30 @@ func (sf *file) prefetchEntireFile() (string, error) {
585605
// cache miss, prefetch the whole chunk
586606
if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF {
587607
sf.gr.putBuffer(b)
588-
return "", fmt.Errorf("failed to read data: %w", err)
608+
w.Abort()
609+
return fmt.Errorf("failed to read data: %w", err)
589610
}
590611
if err := sf.gr.verifyOneChunk(sf.id, ip, chunkDigestStr); err != nil {
591612
sf.gr.putBuffer(b)
592-
return "", err
613+
w.Abort()
614+
return err
615+
}
616+
if _, err := seeker.Seek(totalSize, io.SeekStart); err != nil {
617+
sf.gr.putBuffer(b)
618+
w.Abort()
619+
return fmt.Errorf("failed to seek: %w", err)
620+
}
621+
if _, err := w.Write(ip); err != nil {
622+
sf.gr.putBuffer(b)
623+
w.Abort()
624+
return fmt.Errorf("failed to write fetched data: %w", err)
593625
}
594-
combinedBuffer.Write(ip)
595626
totalSize += chunkSize
596627
offset = chunkOffset + chunkSize
597628
sf.gr.putBuffer(b)
598629
}
599-
combinedIP := combinedBuffer.Bytes()
600-
combinedID := genID(sf.id, firstChunkOffset, totalSize)
601-
sf.gr.cacheData(combinedIP, combinedID)
602-
return combinedID, nil
630+
631+
return w.Commit()
603632
}
604633

605634
func (gr *reader) verifyOneChunk(entryID uint32, ip []byte, chunkDigestStr string) error {

0 commit comments

Comments
 (0)