Skip to content

Commit f863cf9

Browse files
committed
fix: snapshot handling shouldn't list more than once
1 parent e72863d commit f863cf9

File tree

1 file changed

+44
-23
lines changed

1 file changed

+44
-23
lines changed

node/node.go

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"regexp"
910
"strconv"
1011
"sync"
1112
"time"
@@ -34,6 +35,8 @@ const (
3435
DefaultMaxFilesToList int64 = 1000
3536
)
3637

38+
var snapshotFilenameRegex = regexp.MustCompile(`^hr_(\d+)_s_(\d+).snapshot$`)
39+
3740
// Config holds the configuration for a node
3841
type Config struct {
3942
// NodeID is the ID of this node (0-based)
@@ -223,8 +226,30 @@ func (s *Service) initCaches(ctx context.Context) error {
223226
}
224227
}
225228

229+
// List all files in the bucket
230+
filenamesPrefix := getSnapshotFilenamePrefix()
231+
list := s.storage.ListFilesWithPrefix(ctx, "", filenamesPrefix, s.maxFilesToList)
232+
files, err := list.Next()
233+
if err != nil {
234+
return fmt.Errorf("failed to list snapshot files: %w", err)
235+
}
236+
237+
filesByHashRange := make(map[uint32]string, len(files))
238+
for _, file := range files {
239+
matches := snapshotFilenameRegex.FindStringSubmatch(file.Key)
240+
if len(matches) != 3 {
241+
continue
242+
}
243+
hashRange, err := strconv.Atoi(matches[1])
244+
if err != nil {
245+
s.logger.Warnn("Invalid snapshot filename", logger.NewStringField("filename", file.Key))
246+
continue
247+
}
248+
filesByHashRange[uint32(hashRange)] = file.Key
249+
}
250+
226251
// Create a cache for each hash range
227-
group, _ := kitsync.NewEagerGroup(ctx, len(ranges))
252+
group, gCtx := kitsync.NewEagerGroup(ctx, len(ranges))
228253
for r := range ranges {
229254
// Check if we already have a cache for this range
230255
if _, exists := s.caches[r]; exists {
@@ -245,8 +270,13 @@ func (s *Service) initCaches(ctx context.Context) error {
245270
s.metrics.getKeysCounters[r] = s.stats.NewTaggedStat("keydb_get_keys_count", stats.CountType, statsTags)
246271
s.metrics.putKeysCounter[r] = s.stats.NewTaggedStat("keydb_put_keys_count", stats.CountType, statsTags)
247272

273+
snapshotFile := filesByHashRange[r]
274+
if snapshotFile == "" {
275+
continue
276+
}
277+
248278
group.Go(func() error { // Try to load snapshot for this range
249-
if err := s.loadSnapshot(ctx, r, cache); err != nil {
279+
if err := s.loadSnapshot(gCtx, cache, snapshotFile); err != nil {
250280
if errors.Is(err, filemanager.ErrKeyNotFound) {
251281
s.logger.Warnn("No cached snapshot for range", logger.NewIntField("range", int64(r)))
252282
return nil
@@ -583,7 +613,7 @@ func (s *Service) createSnapshot(ctx context.Context, hashRange uint32, cache Ca
583613
return nil // no data to upload
584614
}
585615

586-
filename := getSnapshotFilenamePrefix(hashRange) + getSnapshotFilenamePostfix(since)
616+
filename := getSnapshotFilenamePrefix() + getSnapshotFilenamePostfix(hashRange, since)
587617

588618
_, err = s.storage.UploadReader(ctx, filename, buf)
589619
if err != nil {
@@ -594,34 +624,25 @@ func (s *Service) createSnapshot(ctx context.Context, hashRange uint32, cache Ca
594624
}
595625

596626
// loadSnapshot loads a snapshot for a specific hash range
597-
func (s *Service) loadSnapshot(ctx context.Context, hashRange uint32, cache Cache) error {
598-
filenamePrefix := getSnapshotFilenamePrefix(hashRange)
599-
list := s.storage.ListFilesWithPrefix(ctx, "", filenamePrefix, s.maxFilesToList)
600-
files, err := list.Next()
627+
func (s *Service) loadSnapshot(ctx context.Context, cache Cache, filename string) error {
628+
buf := aws.NewWriteAtBuffer([]byte{})
629+
err := s.storage.Download(ctx, buf, filename)
601630
if err != nil {
602-
return fmt.Errorf("failed to list snapshot files for range %d: %w", hashRange, err)
631+
return fmt.Errorf("failed to download snapshot file %q: %w", filename, err)
603632
}
604633

605-
for _, file := range files {
606-
buf := aws.NewWriteAtBuffer([]byte{})
607-
err := s.storage.Download(ctx, buf, file.Key)
608-
if err != nil {
609-
return fmt.Errorf("failed to download snapshot file %q: %w", file.Key, err)
610-
}
611-
612-
reader := bytes.NewReader(buf.Bytes())
613-
if err := cache.LoadSnapshot(reader); err != nil {
614-
return fmt.Errorf("failed to load snapshot file %q: %w", file.Key, err)
615-
}
634+
reader := bytes.NewReader(buf.Bytes())
635+
if err := cache.LoadSnapshot(reader); err != nil {
636+
return fmt.Errorf("failed to load snapshot file %q: %w", filename, err)
616637
}
617638

618639
return nil
619640
}
620641

621-
func getSnapshotFilenamePrefix(hashRange uint32) string {
622-
return "hr_" + strconv.Itoa(int(hashRange)) + "_s_"
642+
func getSnapshotFilenamePrefix() string {
643+
return "hr_"
623644
}
624645

625-
func getSnapshotFilenamePostfix(since uint64) string {
626-
return strconv.FormatUint(since, 10) + ".snapshot"
646+
func getSnapshotFilenamePostfix(hashRange uint32, since uint64) string {
647+
return strconv.Itoa(int(hashRange)) + "_s_" + strconv.FormatUint(since, 10) + ".snapshot"
627648
}

0 commit comments

Comments
 (0)