Skip to content

Commit 81d37f9

Browse files
committed
chore: custom backup draft
1 parent f13e91c commit 81d37f9

File tree

2 files changed

+226
-49
lines changed

2 files changed

+226
-49
lines changed

internal/cache/badger/badger.go

Lines changed: 216 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,63 @@
11
package badger
22

33
import (
4+
"bytes"
45
"compress/gzip"
6+
"context"
7+
"encoding/binary"
58
"errors"
69
"fmt"
710
"io"
8-
"os"
9-
"path"
11+
"strconv"
1012
"strings"
1113
"sync"
1214
"time"
1315

1416
"github.com/dgraph-io/badger/v4"
1517
"github.com/dgraph-io/badger/v4/options"
18+
"github.com/dgraph-io/badger/v4/pb"
19+
"github.com/dgraph-io/ristretto/v2/z"
20+
"google.golang.org/protobuf/proto"
1621

1722
"github.com/rudderlabs/rudder-go-kit/bytesize"
1823
"github.com/rudderlabs/rudder-go-kit/config"
1924
"github.com/rudderlabs/rudder-go-kit/logger"
25+
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
2026
)
2127

2228
var ErrSnapshotInProgress = errors.New("snapshotting already in progress")
2329

30+
type hasher interface {
31+
GetKeysByHashRange(keys []string) (
32+
map[uint32][]string, // itemsByHashRange
33+
error,
34+
)
35+
GetKeysByHashRangeWithIndexes(keys []string) (
36+
map[uint32][]string, // itemsByHashRange
37+
map[string]int, // indexes
38+
error,
39+
)
40+
}
41+
2442
type Cache struct {
43+
hasher hasher
2544
cache *badger.DB
45+
conf *config.Config
46+
logger logger.Logger
2647
compress bool
2748
discardRatio float64
2849
snapshotSince uint64
2950
snapshotting bool
3051
snapshottingLock sync.Mutex
52+
debugMode bool
3153
}
3254

33-
func Factory(conf *config.Config, log logger.Logger) func(hashRange uint32) (*Cache, error) {
34-
return func(hashRange uint32) (*Cache, error) {
35-
badgerPath := conf.GetString("BadgerDB.Dedup.Path", "/tmp/badger")
36-
badgerPath = path.Join(badgerPath, fmt.Sprintf("%d", hashRange))
37-
if err := os.MkdirAll(badgerPath, 0o755); err != nil {
38-
return nil, fmt.Errorf("failed to create badger directory for hash range %d: %w", hashRange, err)
39-
}
40-
if conf.GetBool("BadgerDB.TestMode", false) {
41-
// TODO use this to enable usage of String() string and Len() methods
42-
}
43-
badgerCache, err := New(badgerPath, conf, log)
44-
if err != nil {
45-
return nil, fmt.Errorf("failed to create cache factory: %w", err)
46-
}
47-
return badgerCache, nil
48-
}
49-
}
50-
51-
func New(path string, conf *config.Config, log logger.Logger) (*Cache, error) {
55+
func New(h hasher, conf *config.Config, log logger.Logger) (*Cache, error) {
56+
path := conf.GetString("BadgerDB.Dedup.Path", "/tmp/badger")
5257
opts := badger.DefaultOptions(path).
5358
WithCompression(options.None).
54-
WithNumGoroutines(1).
5559
WithNumVersionsToKeep(1).
60+
WithNumGoroutines(conf.GetInt("BadgerDB.Dedup.NumGoroutines", 128/3)).
5661
WithBloomFalsePositive(conf.GetFloat64("BadgerDB.Dedup.BloomFalsePositive", 0.000001)).
5762
WithIndexCacheSize(conf.GetInt64Var(16*bytesize.MB, 1, "BadgerDB.Dedup.indexCacheSize", "BadgerDB.indexCacheSize")).
5863
WithValueLogFileSize(conf.GetInt64Var(1*bytesize.MB, 1, "BadgerDB.Dedup.valueLogFileSize", "BadgerDB.valueLogFileSize")).
@@ -65,7 +70,7 @@ func New(path string, conf *config.Config, log logger.Logger) (*Cache, error) {
6570
WithBaseLevelSize(conf.GetInt64Var(5*bytesize.MB, 1, "BadgerDB.Dedup.baseLevelSize", "BadgerDB.baseLevelSize")).
6671
WithLevelSizeMultiplier(conf.GetIntVar(10, 1, "BadgerDB.Dedup.levelSizeMultiplier", "BadgerDB.levelSizeMultiplier")).
6772
WithMaxLevels(conf.GetIntVar(7, 1, "BadgerDB.Dedup.maxLevels", "BadgerDB.maxLevels")).
68-
WithNumCompactors(conf.GetIntVar(2, 1, "BadgerDB.Dedup.numCompactors", "BadgerDB.numCompactors")).
73+
WithNumCompactors(conf.GetIntVar(4, 1, "BadgerDB.Dedup.numCompactors", "BadgerDB.numCompactors")).
6974
WithValueThreshold(conf.GetInt64Var(10*bytesize.B, 1, "BadgerDB.Dedup.valueThreshold", "BadgerDB.valueThreshold")).
7075
WithSyncWrites(conf.GetBoolVar(false, "BadgerDB.Dedup.syncWrites", "BadgerDB.syncWrites")).
7176
WithBlockCacheSize(conf.GetInt64Var(0, 1, "BadgerDB.Dedup.blockCacheSize", "BadgerDB.blockCacheSize")).
@@ -84,28 +89,39 @@ func New(path string, conf *config.Config, log logger.Logger) (*Cache, error) {
8489
return nil, err
8590
}
8691
return &Cache{
92+
hasher: h,
8793
cache: db,
94+
conf: conf,
95+
logger: log,
8896
compress: compress,
8997
discardRatio: conf.GetFloat64("BadgerDB.Dedup.DiscardRatio", 0.7),
98+
debugMode: conf.GetBool("BadgerDB.DebugMode", false),
9099
}, nil
91100
}
92101

93102
// Get returns the values associated with the keys and an error if the operation failed
94103
func (c *Cache) Get(keys []string) ([]bool, error) {
104+
itemsByHashRange, indexes, err := c.hasher.GetKeysByHashRangeWithIndexes(keys)
105+
if err != nil {
106+
return nil, fmt.Errorf("cache get keys: %w", err)
107+
}
108+
95109
results := make([]bool, len(keys))
96110

97-
err := c.cache.View(func(txn *badger.Txn) error {
98-
for i, key := range keys {
99-
_, err := txn.Get([]byte(key))
100-
if err != nil {
101-
if errors.Is(err, badger.ErrKeyNotFound) {
102-
results[i] = false
103-
continue
111+
err = c.cache.View(func(txn *badger.Txn) error {
112+
for hashRange, keys := range itemsByHashRange {
113+
for _, key := range keys {
114+
_, err := txn.Get(c.getKey(key, hashRange))
115+
if err != nil {
116+
if errors.Is(err, badger.ErrKeyNotFound) {
117+
results[indexes[key]] = false
118+
continue
119+
}
120+
return fmt.Errorf("failed to get key %s: %w", key, err)
104121
}
105-
return fmt.Errorf("failed to get key %s: %w", key, err)
122+
// Key exists, value is true
123+
results[indexes[key]] = true
106124
}
107-
// Key exists, value is true
108-
results[i] = true
109125
}
110126
return nil
111127
})
@@ -118,27 +134,39 @@ func (c *Cache) Get(keys []string) ([]bool, error) {
118134

119135
// Put adds or updates elements inside the cache with the specified TTL and returns an error if the operation failed
120136
func (c *Cache) Put(keys []string, ttl time.Duration) error {
121-
err := c.cache.Update(func(txn *badger.Txn) error {
122-
for _, key := range keys {
123-
entry := badger.NewEntry([]byte(key), []byte{})
124-
if ttl > 0 {
125-
entry = entry.WithTTL(ttl)
126-
}
127-
if err := txn.SetEntry(entry); err != nil {
128-
return fmt.Errorf("failed to put key %s: %w", key, err)
137+
itemsByHashRange, err := c.hasher.GetKeysByHashRange(keys)
138+
if err != nil {
139+
return fmt.Errorf("cache put keys: %w", err)
140+
}
141+
142+
err = c.cache.Update(func(txn *badger.Txn) error {
143+
for hashRange, keys := range itemsByHashRange {
144+
for _, key := range keys {
145+
entry := badger.NewEntry(c.getKey(key, hashRange), []byte{})
146+
if ttl > 0 {
147+
entry = entry.WithTTL(ttl)
148+
}
149+
if err := txn.SetEntry(entry); err != nil {
150+
return fmt.Errorf("failed to put key %s: %w", key, err)
151+
}
129152
}
130153
}
131154
return nil
132155
})
133156
if err != nil {
134157
return err
135158
}
159+
136160
return nil
137161
}
138162

139163
// Len returns the number of elements in the cache
140164
// WARNING: this must be used in tests only TODO protect this with a testMode=false by default
141165
func (c *Cache) Len() int {
166+
if !c.debugMode {
167+
panic("Len() is only available in debug mode")
168+
}
169+
142170
count := 0
143171
err := c.cache.View(func(txn *badger.Txn) error {
144172
opts := badger.DefaultIteratorOptions
@@ -160,6 +188,10 @@ func (c *Cache) Len() int {
160188
// String returns a string representation of the cache
161189
// WARNING: this must be used in tests only TODO protect this with a testMode=false by default
162190
func (c *Cache) String() string {
191+
if !c.debugMode {
192+
panic("String() is only available in debug mode")
193+
}
194+
163195
sb := strings.Builder{}
164196
sb.WriteString("{")
165197

@@ -191,7 +223,7 @@ func (c *Cache) String() string {
191223
}
192224

193225
// CreateSnapshot writes the cache contents to the provided writer
194-
func (c *Cache) CreateSnapshot(w io.Writer) (uint64, error) {
226+
func (c *Cache) CreateSnapshot(ctx context.Context, w map[uint32]io.Writer) (uint64, error) {
195227
c.snapshottingLock.Lock()
196228
if c.snapshotting {
197229
c.snapshottingLock.Unlock()
@@ -204,20 +236,142 @@ func (c *Cache) CreateSnapshot(w io.Writer) (uint64, error) {
204236

205237
// TODO gzip should be configurable, maybe we want to use another algorithm with a different compression level
206238
if c.compress {
207-
w = gzip.NewWriter(w)
208-
defer func() { _ = w.(*gzip.Writer).Close() }()
239+
for hashRange, writer := range w {
240+
w[hashRange] = gzip.NewWriter(writer)
241+
}
242+
defer func() {
243+
for _, writer := range w {
244+
err := writer.(*gzip.Writer).Close()
245+
if err != nil {
246+
c.logger.Errorn("failed to close gzip writer", obskit.Error(err))
247+
}
248+
}
249+
}()
209250
}
210251

211-
newSince, err := c.cache.Backup(w, since)
252+
hashRangesMap := make(map[uint32]struct{})
253+
for hashRange := range w {
254+
hashRangesMap[hashRange] = struct{}{}
255+
}
256+
257+
var (
258+
maxVersion uint64
259+
keysToDelete = make([][]byte, 0)
260+
keysToDeleteMu = sync.Mutex{}
261+
)
262+
stream := c.cache.NewStream()
263+
stream.NumGo = c.conf.GetInt("BadgerDB.Dedup.Snapshots.NumGoroutines", 10)
264+
stream.Prefix = []byte("hr")
265+
stream.SinceTs = since
266+
stream.ChooseKey = func(item *badger.Item) (ok bool) {
267+
hasExpired := item.ExpiresAt() > 0 && item.ExpiresAt() <= uint64(time.Now().Unix())
268+
if !hasExpired {
269+
parts := bytes.Split(item.Key(), []byte(":"))
270+
hashRange, _ := strconv.ParseUint(string(parts[0][2:]), 10, 32)
271+
_, ok = hashRangesMap[uint32(hashRange)]
272+
}
273+
if hasExpired || !ok {
274+
keysToDeleteMu.Lock()
275+
keysToDelete = append(keysToDelete, item.Key())
276+
keysToDeleteMu.Unlock()
277+
return false
278+
}
279+
return true
280+
}
281+
stream.Send = func(buf *z.Buffer) error {
282+
list, err := badger.BufferToKVList(buf)
283+
if err != nil {
284+
return err
285+
}
286+
287+
// Group KV pairs by hash range
288+
kvsByHashRange := make(map[uint32][]*pb.KV)
289+
290+
for _, kv := range list.Kv {
291+
if maxVersion < kv.Version {
292+
maxVersion = kv.Version
293+
}
294+
if !kv.StreamDone {
295+
// Extract the hash range from the key.
296+
// Key format is "hr<hashRange>:<actualKey>".
297+
parts := bytes.Split(kv.Key, []byte(":"))
298+
if len(parts) < 2 {
299+
c.logger.Warnn("Skipping malformed key", logger.NewStringField("key", string(kv.Key)))
300+
continue // Skip malformed keys
301+
}
302+
303+
hashRangeStr := string(parts[0][2:]) // Remove "hr" prefix
304+
hashRange, err := strconv.ParseUint(hashRangeStr, 10, 32)
305+
if err != nil {
306+
c.logger.Warnn("Skipping key with invalid hash range", logger.NewStringField("key", string(kv.Key)))
307+
continue // Skip keys with invalid hash range
308+
}
309+
310+
hashRange32 := uint32(hashRange)
311+
312+
// Only include if we have a writer for this hash range
313+
if _, exists := w[hashRange32]; exists {
314+
kvsByHashRange[hashRange32] = append(kvsByHashRange[hashRange32], kv)
315+
}
316+
}
317+
}
318+
319+
// Write to appropriate writers by hash range
320+
for hashRange, kvs := range kvsByHashRange {
321+
writer := w[hashRange]
322+
323+
// Create a new KVList for this hash range
324+
rangeList := &pb.KVList{Kv: kvs}
325+
326+
if err := writeTo(rangeList, writer); err != nil {
327+
return fmt.Errorf("failed to write to hash range %d: %w", hashRange, err)
328+
}
329+
}
330+
331+
return nil
332+
}
333+
334+
err := stream.Orchestrate(ctx)
212335
if err != nil {
213336
return 0, fmt.Errorf("failed to create snapshot: %w", err)
214337
}
215338

216339
c.snapshottingLock.Lock()
217-
c.snapshotSince = newSince
340+
c.snapshotSince = maxVersion
218341
c.snapshotting = false
219342
c.snapshottingLock.Unlock()
220343

344+
if len(keysToDelete) > 0 {
345+
batchSize := c.conf.GetInt("BadgerDB.Dedup.Snapshots.DeleteBatchSize", 1000)
346+
keysToDeleteBatch := make([][]byte, 0, batchSize)
347+
deleteKeys := func() error {
348+
err := c.cache.Update(func(txn *badger.Txn) error {
349+
for _, key := range keysToDeleteBatch {
350+
return txn.Delete(key)
351+
}
352+
return nil
353+
})
354+
if err != nil {
355+
return fmt.Errorf("failed to delete keys: %w", err)
356+
}
357+
keysToDeleteBatch = make([][]byte, 0, batchSize)
358+
return nil
359+
}
360+
for _, key := range keysToDelete {
361+
keysToDeleteBatch = append(keysToDeleteBatch, key)
362+
if len(keysToDeleteBatch) == batchSize {
363+
if err := deleteKeys(); err != nil {
364+
return 0, fmt.Errorf("failed to delete keys: %w", err)
365+
}
366+
}
367+
}
368+
if len(keysToDeleteBatch) > 0 {
369+
if err := deleteKeys(); err != nil {
370+
return 0, fmt.Errorf("failed to delete keys: %w", err)
371+
}
372+
}
373+
}
374+
221375
return since, nil
222376
}
223377

@@ -247,6 +401,22 @@ func (c *Cache) Close() error {
247401
return c.cache.Close()
248402
}
249403

404+
func (c *Cache) getKey(key string, hashRange uint32) []byte {
405+
return []byte("hr" + strconv.Itoa(int(hashRange)) + ":" + key)
406+
}
407+
408+
func writeTo(list *pb.KVList, w io.Writer) error {
409+
if err := binary.Write(w, binary.LittleEndian, uint64(proto.Size(list))); err != nil {
410+
return err
411+
}
412+
buf, err := proto.Marshal(list)
413+
if err != nil {
414+
return err
415+
}
416+
_, err = w.Write(buf)
417+
return err
418+
}
419+
250420
type loggerForBadger struct {
251421
logger.Logger
252422
}

0 commit comments

Comments
 (0)