Skip to content

Commit c97ad7a

Browse files
authored
chore: implement configurable deduplication mirror mode (#6224)
# Description ## Overview Refactored the dedup package to use a single Dedup instance with pluggable database backends instead of creating multiple instances. ## Key Changes ### 1. Core Architecture - **Before**: Multiple Dedup instances (BadgerDB, KeyDB, Mirror) - **After**: Single Dedup instance with pluggable DB backends ### 2. Interface Changes - Added `DB` interface in `types` package for database operations - Made BadgerDB and KeyDB implement the DB interface - Created MirrorDB to handle mirroring logic internally ### 3. Main Dedup Structure ```go // New simplified structure type Dedup struct { db DB uncommittedMu sync.RWMutex uncommitted map[string]struct{} } ``` ### 4. Database Implementations #### BadgerDB (`services/dedup/badger/badger.go`) - `NewBadgerDB` now returns `types.DB` instead of `*Dedup` - Removed embedded Dedup struct - Implements DB interface directly #### KeyDB (`services/dedup/keydb/keydb.go`) - `NewKeyDB` now returns `types.DB` instead of `*Dedup` - Removed embedded Dedup struct - Implements DB interface directly #### MirrorDB (`services/dedup/mirror.go`) - New implementation that wraps primary and mirror databases - Handles asynchronous mirroring operations - Implements both DB and DedupInterface interfaces ### 5. Configuration Modes 1. `badgerOnlyMode` - BadgerDB only 2. `keyDBOnlyMode` - KeyDB only 3. `mirrorBadgerMode` - BadgerDB primary, KeyDB mirror 4. `mirrorKeyDBMode` - KeyDB primary, BadgerDB mirror ### 6. Test Updates (`services/dedup/dedup_test.go`) - Updated to use `types.DedupInterface` instead of `dedup.Dedup` - Modified KeyDB test to use config to select backend - Preserved all existing test logic and coverage ## Benefits - Single Dedup instance management - Cleaner separation of concerns - Easier testing and maintenance - Better extensibility for new database backends - Preserved backward compatibility ## Linear Ticket pipe-2289 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 8be54bf commit c97ad7a

File tree

11 files changed

+732
-342
lines changed

11 files changed

+732
-342
lines changed

mocks/services/dedup/mock_dedup.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"sync"
1616
"time"
1717

18+
"github.com/rudderlabs/rudder-server/services/dedup"
19+
1820
"github.com/google/uuid"
1921
"github.com/samber/lo"
2022
"github.com/tidwall/gjson"
@@ -46,7 +48,7 @@ import (
4648
"github.com/rudderlabs/rudder-server/rruntime"
4749
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
4850
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
49-
"github.com/rudderlabs/rudder-server/services/dedup"
51+
deduptypes "github.com/rudderlabs/rudder-server/services/dedup/types"
5052
"github.com/rudderlabs/rudder-server/services/fileuploader"
5153
"github.com/rudderlabs/rudder-server/services/rmetrics"
5254
"github.com/rudderlabs/rudder-server/services/rsources"
@@ -113,7 +115,7 @@ type Handle struct {
113115
pendingEventsRegistry rmetrics.PendingEventsRegistry
114116
logger logger.Logger
115117
enrichers []enricher.PipelineEnricher
116-
dedup dedup.Dedup
118+
dedup deduptypes.Dedup
117119
reporting reportingtypes.Reporting
118120
reportingEnabled bool
119121
backgroundWait func() error

services/dedup/badger/badger.go

Lines changed: 42 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,27 @@ import (
99
"time"
1010

1111
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
12+
"github.com/rudderlabs/rudder-server/rruntime"
1213

1314
"github.com/dgraph-io/badger/v4"
1415
"github.com/dgraph-io/badger/v4/options"
15-
"github.com/samber/lo"
1616

1717
"github.com/rudderlabs/rudder-go-kit/bytesize"
1818
"github.com/rudderlabs/rudder-go-kit/config"
1919
"github.com/rudderlabs/rudder-go-kit/logger"
2020
"github.com/rudderlabs/rudder-go-kit/stats"
2121

22-
"github.com/rudderlabs/rudder-server/rruntime"
2322
"github.com/rudderlabs/rudder-server/services/dedup/types"
2423
"github.com/rudderlabs/rudder-server/utils/misc"
2524
)
2625

27-
type BadgerDB struct {
26+
type badgerDB struct {
2827
logger loggerForBadger
2928
badgerDB *badger.DB
3029
window config.ValueLoader[time.Duration]
3130
path string
3231
opts badger.Options
3332
cleanupOnStartup bool
34-
once sync.Once
3533

3634
wg sync.WaitGroup
3735
bgCtx context.Context
@@ -55,7 +53,7 @@ func DefaultPath() string {
5553
return fmt.Sprintf(`%v%v`, tmpDirPath, badgerPathName)
5654
}
5755

58-
func NewBadgerDB(conf *config.Config, stat stats.Stats, path string) *Dedup {
56+
func NewBadgerDB(conf *config.Config, stat stats.Stats, path string) (types.DB, error) {
5957
dedupWindow := conf.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS")
6058
log := logger.NewLogger().Child("Dedup")
6159
badgerOpts := badger.
@@ -81,7 +79,7 @@ func NewBadgerDB(conf *config.Config, stat stats.Stats, path string) *Dedup {
8179
WithDetectConflicts(conf.GetBoolVar(false, "BadgerDB.Dedup.detectConflicts", "BadgerDB.detectConflicts"))
8280

8381
bgCtx, cancel := context.WithCancel(context.Background())
84-
db := &BadgerDB{
82+
db := &badgerDB{
8583
logger: loggerForBadger{log},
8684
path: path,
8785
window: dedupWindow,
@@ -96,13 +94,14 @@ func NewBadgerDB(conf *config.Config, stat stats.Stats, path string) *Dedup {
9694
db.stats.vlogSize = stat.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": "dedup", "type": "vlog"})
9795
db.stats.totSize = stat.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": "dedup", "type": "total"})
9896

99-
return &Dedup{
100-
badgerDB: db,
101-
uncommitted: make(map[string]struct{}),
97+
err := db.init()
98+
if err != nil {
99+
return nil, fmt.Errorf("initializing badger db: %w", err)
102100
}
101+
return db, nil
103102
}
104103

105-
func (d *BadgerDB) Get(keys []string) (map[string]bool, error) {
104+
func (d *badgerDB) Get(keys []string) (map[string]bool, error) {
106105
defer d.stats.getTimer.RecordDuration()()
107106
results := make(map[string]bool, len(keys))
108107
err := d.badgerDB.View(func(txn *badger.Txn) error {
@@ -120,7 +119,7 @@ func (d *BadgerDB) Get(keys []string) (map[string]bool, error) {
120119
return results, err
121120
}
122121

123-
func (d *BadgerDB) Set(keys []string) error {
122+
func (d *badgerDB) Set(keys []string) error {
124123
defer d.stats.setTimer.RecordDuration()()
125124
wb := d.badgerDB.NewWriteBatch()
126125
defer wb.Cancel()
@@ -133,57 +132,55 @@ func (d *BadgerDB) Set(keys []string) error {
133132
return wb.Flush()
134133
}
135134

136-
func (d *BadgerDB) Close() {
135+
func (d *badgerDB) Close() {
137136
d.cancel()
138137
d.wg.Wait()
139138
if d.badgerDB != nil {
140139
_ = d.badgerDB.Close()
141140
}
142141
}
143142

144-
func (d *BadgerDB) init() error {
143+
func (d *badgerDB) init() error {
145144
var err error
146-
d.once.Do(func() {
147-
if d.cleanupOnStartup {
148-
if err = os.RemoveAll(d.path); err != nil {
149-
err = fmt.Errorf("removing badger db directory: %w", err)
150-
return
151-
}
145+
if d.cleanupOnStartup {
146+
if err = os.RemoveAll(d.path); err != nil {
147+
err = fmt.Errorf("removing badger db directory: %w", err)
148+
return err
152149
}
153-
openDB := func() (dbase *badger.DB, err error) {
154-
defer func() {
155-
if r := recover(); r != nil {
156-
err = fmt.Errorf("panic during badgerdb open: %v", r)
157-
}
158-
}()
159-
return badger.Open(d.opts)
150+
}
151+
openDB := func() (dbase *badger.DB, err error) {
152+
defer func() {
153+
if r := recover(); r != nil {
154+
err = fmt.Errorf("panic during badgerdb open: %v", r)
155+
}
156+
}()
157+
return badger.Open(d.opts)
158+
}
159+
d.badgerDB, err = openDB()
160+
if err != nil {
161+
// corrupted or incompatible db, clean up the directory and retry
162+
d.logger.Errorn("Error while opening dedup badger db, cleaning up the directory",
163+
obskit.Error(err),
164+
)
165+
if err = os.RemoveAll(d.opts.Dir); err != nil {
166+
err = fmt.Errorf("removing badger db directory: %w", err)
167+
return err
160168
}
161169
d.badgerDB, err = openDB()
162170
if err != nil {
163-
// corrupted or incompatible db, clean up the directory and retry
164-
d.logger.Errorn("Error while opening dedup badger db, cleaning up the directory",
165-
obskit.Error(err),
166-
)
167-
if err = os.RemoveAll(d.opts.Dir); err != nil {
168-
err = fmt.Errorf("removing badger db directory: %w", err)
169-
return
170-
}
171-
d.badgerDB, err = openDB()
172-
if err != nil {
173-
err = fmt.Errorf("opening badger db: %w", err)
174-
return
175-
}
171+
err = fmt.Errorf("opening badger db: %w", err)
172+
return err
176173
}
177-
d.wg.Add(1)
178-
rruntime.Go(func() {
179-
defer d.wg.Done()
180-
d.gcLoop()
181-
})
174+
}
175+
d.wg.Add(1)
176+
rruntime.Go(func() {
177+
defer d.wg.Done()
178+
d.gcLoop()
182179
})
183180
return err
184181
}
185182

186-
func (d *BadgerDB) gcLoop() {
183+
func (d *badgerDB) gcLoop() {
187184
for {
188185
select {
189186
case <-d.bgCtx.Done():
@@ -213,87 +210,6 @@ func (d *BadgerDB) gcLoop() {
213210
}
214211
}
215212

216-
type Dedup struct {
217-
badgerDB *BadgerDB
218-
uncommittedMu sync.RWMutex
219-
uncommitted map[string]struct{}
220-
}
221-
222-
func (d *Dedup) Allowed(batchKeys ...types.BatchKey) (map[types.BatchKey]bool, error) {
223-
if err := d.badgerDB.init(); err != nil {
224-
return nil, fmt.Errorf("initializing badger db: %w", err)
225-
}
226-
result := make(map[types.BatchKey]bool, len(batchKeys)) // keys encountered for the first time
227-
seenInBatch := make(map[string]struct{}, len(batchKeys)) // keys already seen in the batch while iterating
228-
229-
// figure out which keys need to be checked against the DB
230-
batchKeysToCheck := make([]types.BatchKey, 0, len(batchKeys)) // keys to check in the DB
231-
d.uncommittedMu.RLock()
232-
for _, batchKey := range batchKeys {
233-
// if the key is already seen in the batch, skip it
234-
if _, seen := seenInBatch[batchKey.Key]; seen {
235-
continue
236-
}
237-
// if the key is already in the uncommitted list , skip it
238-
if _, uncommitted := d.uncommitted[batchKey.Key]; uncommitted {
239-
seenInBatch[batchKey.Key] = struct{}{}
240-
continue
241-
}
242-
seenInBatch[batchKey.Key] = struct{}{}
243-
batchKeysToCheck = append(batchKeysToCheck, batchKey)
244-
}
245-
d.uncommittedMu.RUnlock()
246-
247-
if len(batchKeysToCheck) > 0 {
248-
seenInDB, err := d.badgerDB.Get(lo.Map(batchKeysToCheck, func(bk types.BatchKey, _ int) string { return bk.Key }))
249-
if err != nil {
250-
return nil, fmt.Errorf("getting keys from badger db: %w", err)
251-
}
252-
d.uncommittedMu.Lock()
253-
defer d.uncommittedMu.Unlock()
254-
for _, batchKey := range batchKeysToCheck {
255-
if !seenInDB[batchKey.Key] {
256-
if _, race := d.uncommitted[batchKey.Key]; !race { // if another goroutine managed to set this key, we should skip it
257-
result[batchKey] = true
258-
d.uncommitted[batchKey.Key] = struct{}{} // mark this key as uncommitted
259-
}
260-
}
261-
}
262-
}
263-
return result, nil
264-
}
265-
266-
func (d *Dedup) Commit(keys []string) error {
267-
if err := d.badgerDB.init(); err != nil {
268-
return fmt.Errorf("initializing badger db: %w", err)
269-
}
270-
kvs := make([]types.BatchKey, len(keys))
271-
d.uncommittedMu.RLock()
272-
for i, key := range keys {
273-
if _, ok := d.uncommitted[key]; !ok {
274-
d.uncommittedMu.RUnlock()
275-
return fmt.Errorf("key %v has not been previously set", key)
276-
}
277-
kvs[i] = types.BatchKey{Key: key}
278-
}
279-
d.uncommittedMu.RUnlock()
280-
281-
if err := d.badgerDB.Set(keys); err != nil {
282-
return fmt.Errorf("setting keys in badger db: %w", err)
283-
}
284-
285-
d.uncommittedMu.Lock()
286-
defer d.uncommittedMu.Unlock()
287-
for _, kv := range kvs {
288-
delete(d.uncommitted, kv.Key)
289-
}
290-
return nil
291-
}
292-
293-
func (d *Dedup) Close() {
294-
d.badgerDB.Close()
295-
}
296-
297213
type loggerForBadger struct {
298214
logger.Logger
299215
}

0 commit comments

Comments
 (0)