Skip to content

Commit 64bb247

Browse files
committed
chore: garbage collection
1 parent 541bd30 commit 64bb247

File tree

5 files changed

+84
-6
lines changed

5 files changed

+84
-6
lines changed

cmd/node/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,9 @@ var defaultHistogramBuckets = []float64{
99
300 /* 5 mins */, 600 /* 10 mins */, 1800, /* 30 mins */
1010
}
1111

12-
var customBuckets = map[string][]float64{}
12+
var customBuckets = map[string][]float64{
13+
"keydb_gc_duration_seconds": {
14+
// 2ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s, 20s, 30s, 1m, 5m, 10m, 30m
15+
0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 20, 30, 60, 300, 600, 1800,
16+
},
17+
}

cmd/node/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ func run(ctx context.Context, cancel func(), conf *config.Config, stat stats.Sta
101101
logger.NewIntField("clusterSize", int64(nodeConfig.ClusterSize)),
102102
logger.NewIntField("totalHashRanges", int64(nodeConfig.TotalHashRanges)),
103103
logger.NewDurationField("snapshotInterval", nodeConfig.SnapshotInterval),
104+
logger.NewDurationField("gcInterval", nodeConfig.GarbageCollectionInterval),
104105
logger.NewStringField("nodeAddresses", fmt.Sprintf("%+v", nodeConfig.Addresses)),
105106
logger.NewIntField("noOfAddresses", int64(len(nodeConfig.Addresses))),
106107
)

internal/cache/badger/badger.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ var ErrSnapshotInProgress = errors.New("snapshotting already in progress")
2424
type Cache struct {
2525
cache *badger.DB
2626
compress bool
27+
discardRatio float64
2728
snapshotSince uint64
2829
snapshotting bool
2930
snapshottingLock sync.Mutex
@@ -83,8 +84,9 @@ func New(path string, conf *config.Config, log logger.Logger) (*Cache, error) {
8384
return nil, err
8485
}
8586
return &Cache{
86-
cache: db,
87-
compress: compress,
87+
cache: db,
88+
compress: compress,
89+
discardRatio: conf.GetFloat64("BadgerDB.Dedup.DiscardRatio", 0.7),
8890
}, nil
8991
}
9092

@@ -233,6 +235,14 @@ func (c *Cache) LoadSnapshot(r io.Reader) error {
233235
return c.cache.Load(r, 16)
234236
}
235237

238+
func (c *Cache) RunGarbageCollection() {
239+
again: // see https://dgraph.io/docs/badger/get-started/#garbage-collection
240+
err := c.cache.RunValueLogGC(c.discardRatio)
241+
if err == nil {
242+
goto again
243+
}
244+
}
245+
236246
func (c *Cache) Close() error {
237247
return c.cache.Close()
238248
}

internal/cache/cachettl/cachettl.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ func (c *Cache) LoadSnapshot(r io.Reader) error {
5959
return cachettl.LoadSnapshot(r, c.cache)
6060
}
6161

62+
func (c *Cache) RunGarbageCollection() {
63+
// no need to implement since we won't be using cachettl
64+
}
65+
6266
// Close releases any resources held by the cache and performs any necessary cleanup.
6367
func (c *Cache) Close() error { // TODO implementation
6468
return nil

node/node.go

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ const (
3131
// DefaultSnapshotInterval is the default interval for creating snapshots (in seconds)
3232
DefaultSnapshotInterval = 24 * time.Hour
3333

34+
// DefaultGarbageCollectionInterval defines how often garbage collection should happen per cache
35+
DefaultGarbageCollectionInterval = 5 * time.Minute
36+
3437
// DefaultMaxFilesToList defines the default maximum number of files to list in a single operation, set to 1000.
3538
DefaultMaxFilesToList int64 = 1000
3639
)
@@ -54,6 +57,9 @@ type Config struct {
5457
// SnapshotInterval is the interval for creating snapshots (in seconds)
5558
SnapshotInterval time.Duration
5659

60+
// GarbageCollectionInterval defines the duration between automatic GC operation per cache
61+
GarbageCollectionInterval time.Duration
62+
5763
// Addresses is a list of node addresses that this node will advertise to clients
5864
Addresses []string
5965
}
@@ -84,6 +90,7 @@ type Service struct {
8490
errScalingCounter stats.Counter
8591
errWrongNodeCounter stats.Counter
8692
errInternalCounter stats.Counter
93+
gcDuration stats.Histogram
8794
}
8895
}
8996

@@ -102,6 +109,9 @@ type Cache interface {
102109
// LoadSnapshot reads the cache contents from the provided reader
103110
LoadSnapshot(r io.Reader) error
104111

112+
// RunGarbageCollection is designed to do GC while the cache is online
113+
RunGarbageCollection()
114+
105115
// Close releases any resources associated with the cache and ensures proper cleanup. Returns an error if the operation fails.
106116
Close() error
107117
}
@@ -130,11 +140,12 @@ func NewService(
130140
if config.TotalHashRanges == 0 {
131141
config.TotalHashRanges = DefaultTotalHashRanges
132142
}
133-
134143
if config.SnapshotInterval == 0 {
135144
config.SnapshotInterval = DefaultSnapshotInterval
136145
}
137-
146+
if config.GarbageCollectionInterval == 0 {
147+
config.GarbageCollectionInterval = DefaultGarbageCollectionInterval
148+
}
138149
if config.MaxFilesToList == 0 {
139150
config.MaxFilesToList = DefaultMaxFilesToList
140151
}
@@ -150,7 +161,9 @@ func NewService(
150161
logger: log.Withn(
151162
logger.NewIntField("nodeId", int64(config.NodeID)),
152163
logger.NewIntField("totalHashRanges", int64(config.TotalHashRanges)),
153-
logger.NewIntField("snapshotInterval", int64(config.SnapshotInterval.Seconds())),
164+
logger.NewDurationField("snapshotInterval", config.SnapshotInterval),
165+
logger.NewDurationField("garbageCollectionInterval", config.GarbageCollectionInterval),
166+
logger.NewIntField("maxFilesToList", config.MaxFilesToList),
154167
),
155168
}
156169

@@ -160,6 +173,7 @@ func NewService(
160173
service.metrics.errInternalCounter = stat.NewTaggedStat("keydb_err_internal_count", stats.CountType, statsTags)
161174
service.metrics.getKeysCounters = make(map[uint32]stats.Counter)
162175
service.metrics.putKeysCounter = make(map[uint32]stats.Counter)
176+
service.metrics.gcDuration = stat.NewTaggedStat("keydb_gc_duration_seconds", stats.HistogramType, statsTags)
163177

164178
// Initialize caches for all hash ranges this node handles
165179
if err := service.initCaches(ctx); err != nil {
@@ -169,6 +183,8 @@ func NewService(
169183
// Start background snapshot creation
170184
service.waitGroup.Add(1)
171185
go service.snapshotLoop(ctx)
186+
service.waitGroup.Add(1)
187+
go service.garbageCollection(ctx)
172188

173189
return service, nil
174190
}
@@ -202,6 +218,7 @@ func (s *Service) snapshotLoop(ctx context.Context) {
202218
}
203219
if s.now().Sub(s.lastSnapshotTime) < s.config.SnapshotInterval { // TODO write a test for this
204220
// we created a snapshot already recently due to a scaling operation
221+
s.logger.Debugn("Skipping snapshot due to scaling operation")
205222
return
206223
}
207224
if err := s.createSnapshots(ctx); err != nil {
@@ -212,6 +229,46 @@ func (s *Service) snapshotLoop(ctx context.Context) {
212229
}
213230
}
214231

232+
func (s *Service) garbageCollection(ctx context.Context) {
233+
defer s.waitGroup.Done()
234+
235+
ticker := time.NewTicker(s.config.GarbageCollectionInterval)
236+
defer ticker.Stop()
237+
238+
for {
239+
select {
240+
case <-ctx.Done():
241+
return
242+
case <-ticker.C:
243+
func() {
244+
s.mu.Lock() // TODO this might affect scaling operations
245+
defer s.mu.Unlock()
246+
247+
if s.scaling {
248+
s.logger.Warnn("Skipping garbage collection while scaling")
249+
return
250+
}
251+
252+
start := time.Now()
253+
s.logger.Infon("Running garbage collection", logger.NewIntField("noOfCaches", int64(len(s.caches))))
254+
defer func() {
255+
elapsed := time.Since(start)
256+
s.logger.Infon("Garbage collection finished",
257+
logger.NewIntField("noOfCaches", int64(len(s.caches))),
258+
logger.NewDurationField("duration", elapsed),
259+
)
260+
s.metrics.gcDuration.Observe(elapsed.Seconds())
261+
}()
262+
263+
for _, cache := range s.caches {
264+
// For now let's just run one GC at a time to avoid overwhelming the CPU
265+
cache.RunGarbageCollection()
266+
}
267+
}()
268+
}
269+
}
270+
}
271+
215272
// initCaches initializes the caches for all hash ranges this node handles
216273
func (s *Service) initCaches(ctx context.Context) error {
217274
// Get hash ranges for this node
@@ -542,6 +599,7 @@ func (s *Service) ScaleComplete(_ context.Context, _ *pb.ScaleCompleteRequest) (
542599
}
543600

544601
// CreateSnapshot implements the CreateSnapshot RPC method
602+
// TODO should we trigger a Garbage Collection before taking a snapshot?
545603
// TODO this can be optimized a lot! For example we could snapshot on local disk every 10s and work only on the head
546604
// and tail of the file (i.e. remove expired from head and append new entries).
547605
// Then once a minute we can upload the whole file to S3.

0 commit comments

Comments
 (0)