Skip to content

fix panic when chunk size exceeds merge buffer size #2068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions fs/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
offset int64
firstChunkOffset int64
totalSize int64
hasLargeChunk bool
)

var chunks []chunkData
Expand All @@ -516,6 +517,10 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
if !ok {
break
}
// Check if any chunk size exceeds merge buffer size to avoid bounds out of range
if chunkSize > mergeBufferSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not dynamically allocate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamic allocation also carries risks. For example, in environments with limited memory, if the chunk size is large and multiple chunks are being downloaded concurrently, it could lead to OOM (out-of-memory) issues—similar to the problem addressed in this PR.

The purpose of making mergeBufferSize configurable is to allow users to set an acceptable buffer size based on their own environment’s resources and the characteristics of their images.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow users to set an acceptable buffer size based on their own environment’s resources and the characteristics of their images.

It's better to automatically control (reduce or disable) the concurrency not to exceed the buffer limit. Snapshotter should not fail depending on the image size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, when chunkSize > mergeBufferSize, it will automatically fall back to sequential processing using the legacy approach.

hasLargeChunk = true
}
chunks = append(chunks, chunkData{
offset: chunkOffset,
size: chunkSize,
Expand All @@ -530,8 +535,14 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
// cache.PassThrough() is necessary to take over files
r, err := sf.gr.cache.Get(id, cache.PassThrough())
if err != nil {
if err := sf.prefetchEntireFile(id, chunks, totalSize, mergeBufferSize, mergeWorkerCount); err != nil {
return 0, err
if hasLargeChunk {
if err := sf.prefetchEntireFileSequential(id); err != nil {
return 0, err
}
} else {
if err := sf.prefetchEntireFile(id, chunks, totalSize, mergeBufferSize, mergeWorkerCount); err != nil {
return 0, err
}
}

// just retry once to avoid exception stuck
Expand All @@ -553,6 +564,68 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
return fd, nil
}

// prefetchEntireFileSequential uses the legacy sequential approach for processing chunks
// when chunk size exceeds merge buffer size to avoid slice bounds out of range panic
func (sf *file) prefetchEntireFileSequential(entireCacheID string) error {
w, err := sf.gr.cache.Add(entireCacheID)
if err != nil {
return fmt.Errorf("failed to create cache writer: %w", err)
}
defer w.Close()

var offset int64

for {
chunkOffset, chunkSize, chunkDigestStr, ok := sf.fr.ChunkEntryForOffset(offset)
if !ok {
break
}

id := genID(sf.id, chunkOffset, chunkSize)
b := sf.gr.bufPool.Get().(*bytes.Buffer)
b.Reset()
b.Grow(int(chunkSize))
ip := b.Bytes()[:chunkSize]

if r, err := sf.gr.cache.Get(id); err == nil {
n, err := r.ReadAt(ip, 0)
if (err == nil || err == io.EOF) && int64(n) == chunkSize {
if _, err := w.Write(ip[:n]); err != nil {
r.Close()
sf.gr.putBuffer(b)
w.Abort()
return fmt.Errorf("failed to write cached data: %w", err)
}
offset = chunkOffset + int64(n)
r.Close()
sf.gr.putBuffer(b)
continue
}
r.Close()
}

if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF {
sf.gr.putBuffer(b)
w.Abort()
return fmt.Errorf("failed to read data: %w", err)
}
if err := sf.gr.verifyOneChunk(sf.id, ip, chunkDigestStr); err != nil {
sf.gr.putBuffer(b)
w.Abort()
return err
}
if _, err := w.Write(ip); err != nil {
sf.gr.putBuffer(b)
w.Abort()
return fmt.Errorf("failed to write fetched data: %w", err)
}
offset = chunkOffset + chunkSize
sf.gr.putBuffer(b)
}

return w.Commit()
}

type batchWorkerArgs struct {
workerID int
chunks []chunkData
Expand Down
Loading