Skip to content

Commit 9059c4c

Browse files
committed
Synchronize importers with server regularly to avoid entropy.
Entropy can be generated while performing a migration while keeping the service running.
1 parent 16f479e commit 9059c4c

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

internal/client/graph_api.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,26 @@ package client
22

33
import (
44
"fmt"
5+
"math/rand"
56
"time"
67

78
"github.com/clems4ever/go-graphkb/internal/knowledge"
89
"github.com/sirupsen/logrus"
910
)
1011

12+
func init() {
13+
rand.Seed(time.Now().UnixNano())
14+
}
15+
1116
// GraphAPI represent the graph API from a data source point of view
1217
type GraphAPI struct {
1318
client *GraphClient
1419

1520
options GraphAPIOptions
1621

1722
currentGraph *knowledge.Graph
23+
// Stores the date after which the graph will be considered stale
24+
currentGraphStaleAfter time.Time
1825
}
1926

2027
// GraphAPIOptions options to pass to build graph API
@@ -39,6 +46,11 @@ type GraphAPIOptions struct {
3946

4047
// The backoff factor (default is 1.01)
4148
RetryBackoffFactor float64
49+
50+
// The API stores the lastly pushed graph in memory to avoid fetching the entire data at every run which can be heavy on
51+
// DB if importers run very incrementally. The anti entropy duration is a duration before forcing a synchronization against
52+
// the server even though the graph is still stored in memory.
53+
AntiEntropyDuration time.Duration
4254
}
4355

4456
// NewGraphAPI create an emitter of graph
@@ -52,13 +64,17 @@ func NewGraphAPI(options GraphAPIOptions) *GraphAPI {
5264
// CreateTransaction create a full graph transaction. This kind of transaction will diff the new graph
5365
// with previous version of it.
5466
func (gapi *GraphAPI) CreateTransaction() (*Transaction, error) {
55-
if gapi.currentGraph == nil {
67+
if gapi.currentGraph == nil || gapi.currentGraphStaleAfter.Before(time.Now()) {
5668
logrus.Debug("transaction: fetching remote graph")
5769
g, err := gapi.ReadCurrentGraph()
5870
if err != nil {
5971
return nil, fmt.Errorf("create transaction: %w", err)
6072
}
6173
gapi.currentGraph = g
74+
75+
quarter := int64(gapi.options.AntiEntropyDuration) / 4
76+
randDelta := time.Duration(rand.Int63n(quarter*2) - quarter)
77+
gapi.currentGraphStaleAfter = time.Now().Add(gapi.options.AntiEntropyDuration).Add(randDelta)
6278
}
6379

6480
var parallelization = gapi.options.Parallelization

0 commit comments

Comments
 (0)