Skip to content

Commit e37dab9

Browse files
authored
chore: snapshots and scaling operations (#18)
* fix: snapshot already in progress error * chore: one stream per hash range * chore: populating since during startup * feat: selected snapshots * feat: create snapshots full sync capabilities * chore: deleting old files before proceeding to next hash range * chore: load snapshot to support selected hash ranges * chore: scale does not download snapshots anymore * chore: [email protected] * chore: operator API to call single nodes * chore: operator and client split * test: operator server and hashing * chore: opreator auto scale * chore: use clusterSize for consistency * chore: simplifying movements logic * chore: /hashRangeMovements operator endpoint * chore: /hashRangeMovements can upload * test: file assertions in operator * chore: use go-kit minio container * chore: split uploads * chore: removing TODO * chore: using default properly * chore: clean up redundant method checks and parallelize snapshot operations with errgroup * chore: use GetHashRangeMovements for scaling down as well * chore: adding comment * chore: populate since map regardless of download flag * chore: adding test for since map population on start-up
1 parent 47a9f62 commit e37dab9

File tree

20 files changed

+2986
-935
lines changed

20 files changed

+2986
-935
lines changed

.github/workflows/verify.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,5 @@ jobs:
4949
- name: golangci-lint
5050
uses: golangci/golangci-lint-action@v8
5151
with:
52-
version: v2.1.6
52+
version: v2.3.1
5353
args: -v

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ GO := go
22
TESTFILE := _testok
33
DOCKER_USER :=
44

5-
GOLANG_CI := github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.1.6
5+
GOLANG_CI := github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.3.1
66
GOFUMPT := mvdan.cc/gofumpt@latest
77
GOVULNCHECK := golang.org/x/vuln/cmd/govulncheck@latest
88
GOIMPORTS := golang.org/x/tools/cmd/goimports@latest
@@ -127,4 +127,4 @@ fmt: install-tools ## Formats all go
127127
./build/docker-go-version.sh Dockerfile-operator
128128

129129
mocks: install-tools ## Generate all mocks
130-
$(GO) generate ./...
130+
$(GO) generate ./...

client/client.go

Lines changed: 2 additions & 279 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,15 @@ import (
88
"sync/atomic"
99
"time"
1010

11-
"github.com/rudderlabs/rudder-go-kit/stats"
12-
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
13-
1411
"google.golang.org/grpc"
1512
"google.golang.org/grpc/credentials/insecure"
1613

1714
"github.com/rudderlabs/keydb/internal/hash"
1815
pb "github.com/rudderlabs/keydb/proto"
1916
"github.com/rudderlabs/rudder-go-kit/logger"
17+
"github.com/rudderlabs/rudder-go-kit/stats"
2018
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
19+
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
2120
)
2221

2322
var (
@@ -474,282 +473,6 @@ func (c *Client) put(ctx context.Context, keys []string, ttl time.Duration) erro
474473
return nil
475474
}
476475

477-
// GetNodeInfo returns information about a node
478-
func (c *Client) GetNodeInfo(ctx context.Context, nodeID uint32) (*pb.GetNodeInfoResponse, error) {
479-
c.mu.RLock()
480-
defer c.mu.RUnlock()
481-
482-
// Get the client for this node
483-
client, ok := c.clients[int(nodeID)]
484-
if !ok {
485-
// this should never happen unless clusterSize is updated and the c.clients map isn't
486-
// or if there is a bug in the hashing function
487-
return nil, fmt.Errorf("no client for node %d", nodeID)
488-
}
489-
490-
// Create the request
491-
req := &pb.GetNodeInfoRequest{NodeId: nodeID}
492-
493-
// Send the request with retries
494-
var err error
495-
var resp *pb.GetNodeInfoResponse
496-
for i := 0; i <= c.config.RetryCount; i++ {
497-
resp, err = client.GetNodeInfo(ctx, req)
498-
if err == nil {
499-
break
500-
}
501-
502-
// If this is the last retry, return the error
503-
if i == c.config.RetryCount {
504-
return nil, fmt.Errorf("failed to get node info from node %d: %w", nodeID, err)
505-
}
506-
507-
// Wait before retrying
508-
select {
509-
case <-ctx.Done():
510-
return nil, ctx.Err()
511-
case <-time.After(c.config.RetryDelay):
512-
}
513-
}
514-
515-
if c.clusterSize != resp.ClusterSize {
516-
if err = c.updateClusterSize(resp.NodesAddresses); err != nil {
517-
return nil, fmt.Errorf("failed to update cluster size: %w", err)
518-
}
519-
}
520-
521-
return resp, nil
522-
}
523-
524-
// CreateSnapshots forces the creation of snapshots on a node
525-
// This method is meant to be used by an Operator process only!
526-
func (c *Client) CreateSnapshots(ctx context.Context) error {
527-
c.mu.RLock()
528-
defer c.mu.RUnlock()
529-
530-
group, ctx := kitsync.NewEagerGroup(ctx, len(c.clients))
531-
for nodeID, client := range c.clients {
532-
group.Go(func() error {
533-
req := &pb.CreateSnapshotsRequest{}
534-
535-
var err error
536-
var resp *pb.CreateSnapshotsResponse
537-
for i := 0; i <= c.config.RetryCount; i++ {
538-
resp, err = client.CreateSnapshots(ctx, req)
539-
if err == nil {
540-
break
541-
}
542-
543-
// If this is the last retry, return the error
544-
if i == c.config.RetryCount {
545-
return fmt.Errorf("failed to create snapshot on node %d: %w", nodeID, err)
546-
}
547-
548-
// Wait before retrying
549-
select {
550-
case <-ctx.Done():
551-
return ctx.Err()
552-
case <-time.After(c.config.RetryDelay):
553-
}
554-
}
555-
556-
if !resp.Success {
557-
return fmt.Errorf("failed to create snapshot on node %d: %w", nodeID, err)
558-
}
559-
560-
return nil
561-
})
562-
}
563-
564-
return group.Wait()
565-
}
566-
567-
// LoadSnapshots forces all nodes to load snapshots from cloud storage
568-
// This method is meant to be used by an Operator process only!
569-
func (c *Client) LoadSnapshots(ctx context.Context) error {
570-
c.mu.RLock()
571-
defer c.mu.RUnlock()
572-
573-
group, ctx := kitsync.NewEagerGroup(ctx, len(c.clients))
574-
for nodeID, client := range c.clients {
575-
group.Go(func() error {
576-
req := &pb.LoadSnapshotsRequest{}
577-
578-
var err error
579-
var resp *pb.LoadSnapshotsResponse
580-
for i := 0; i <= c.config.RetryCount; i++ {
581-
resp, err = client.LoadSnapshots(ctx, req)
582-
if err == nil && resp != nil && resp.Success {
583-
break
584-
}
585-
586-
// If this is the last retry, return the error
587-
if i == c.config.RetryCount {
588-
errMsg := "unknown error"
589-
if err != nil {
590-
errMsg = err.Error()
591-
} else if resp != nil {
592-
errMsg = resp.ErrorMessage
593-
}
594-
return fmt.Errorf("failed to load snapshots on node %d: %s", nodeID, errMsg)
595-
}
596-
597-
// Wait before retrying
598-
select {
599-
case <-ctx.Done():
600-
return ctx.Err()
601-
case <-time.After(c.config.RetryDelay):
602-
}
603-
}
604-
605-
return nil
606-
})
607-
}
608-
609-
return group.Wait()
610-
}
611-
612-
// Scale changes the number of nodes in the cluster
613-
// This method is meant to be used by an Operator process only!
614-
func (c *Client) Scale(ctx context.Context, addresses ...string) error {
615-
c.mu.Lock()
616-
defer c.mu.Unlock()
617-
618-
newClusterSize := uint32(len(addresses))
619-
if newClusterSize == c.clusterSize {
620-
return nil // No change needed
621-
}
622-
623-
// Handle case when newClusterSize is bigger
624-
if newClusterSize > c.clusterSize {
625-
// Establish new connections to the new nodes
626-
for i := int(c.clusterSize); i < int(newClusterSize); i++ {
627-
addr := addresses[i]
628-
conn, err := grpc.NewClient(addr,
629-
grpc.WithTransportCredentials(insecure.NewCredentials()),
630-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
631-
var dialer net.Dialer
632-
return dialer.DialContext(ctx, "tcp", addr)
633-
}),
634-
)
635-
if err != nil {
636-
// Close any new connections we've made so far
637-
for j := int(c.clusterSize); j < i; j++ {
638-
if conn, ok := c.connections[j]; ok {
639-
_ = conn.Close()
640-
delete(c.connections, j)
641-
delete(c.clients, j)
642-
}
643-
}
644-
return fmt.Errorf("failed to connect to node %d at %s: %w", i, addr, err)
645-
}
646-
647-
c.connections[i] = conn
648-
c.clients[i] = pb.NewNodeServiceClient(conn)
649-
}
650-
} else if newClusterSize < c.clusterSize {
651-
// Handle case when newClusterSize is smaller
652-
// Close unnecessary connections
653-
for i := int(newClusterSize); i < int(c.clusterSize); i++ {
654-
if conn, ok := c.connections[i]; ok {
655-
_ = conn.Close() // Ignore errors during close
656-
delete(c.connections, i)
657-
delete(c.clients, i)
658-
}
659-
}
660-
}
661-
662-
// Send ScaleRequest to all nodes
663-
group, ctx := kitsync.NewEagerGroup(ctx, len(c.clients))
664-
for nodeID, client := range c.clients {
665-
group.Go(func() error {
666-
req := &pb.ScaleRequest{
667-
NewClusterSize: newClusterSize,
668-
NodesAddresses: addresses,
669-
}
670-
671-
var err error
672-
var resp *pb.ScaleResponse
673-
for i := 0; i <= c.config.RetryCount; i++ {
674-
resp, err = client.Scale(ctx, req)
675-
if err == nil && resp != nil && resp.Success {
676-
break
677-
}
678-
679-
// If this is the last retry, save the error
680-
if i == c.config.RetryCount {
681-
errMsg := "unknown error"
682-
if err != nil {
683-
errMsg = err.Error()
684-
} else if resp != nil {
685-
errMsg = resp.ErrorMessage
686-
}
687-
return fmt.Errorf("failed to scale node %d: %s", nodeID, errMsg)
688-
}
689-
690-
// Wait before retrying
691-
select {
692-
case <-ctx.Done():
693-
return ctx.Err()
694-
case <-time.After(c.config.RetryDelay):
695-
}
696-
}
697-
698-
return nil
699-
})
700-
}
701-
702-
err := group.Wait()
703-
if err != nil {
704-
return err
705-
}
706-
707-
c.config.Addresses = addresses
708-
c.clusterSize = newClusterSize
709-
710-
return nil
711-
}
712-
713-
// ScaleComplete notifies a node that the scaling operation is complete
714-
// This method is meant to be used by an Operator process only!
715-
func (c *Client) ScaleComplete(ctx context.Context) error {
716-
c.mu.RLock()
717-
defer c.mu.RUnlock()
718-
719-
group, ctx := kitsync.NewEagerGroup(ctx, len(c.clients))
720-
for nodeID, client := range c.clients {
721-
group.Go(func() error {
722-
req := &pb.ScaleCompleteRequest{}
723-
724-
// Send the request with retries
725-
var err error
726-
var resp *pb.ScaleCompleteResponse
727-
for i := 0; i <= c.config.RetryCount; i++ {
728-
resp, err = client.ScaleComplete(ctx, req)
729-
if err == nil && resp != nil && resp.Success {
730-
break
731-
}
732-
733-
// If this is the last retry, return the error
734-
if i == c.config.RetryCount {
735-
return fmt.Errorf("failed to complete scale operation on node %d: %w", nodeID, err)
736-
}
737-
738-
// Wait before retrying
739-
select {
740-
case <-ctx.Done():
741-
return ctx.Err()
742-
case <-time.After(c.config.RetryDelay):
743-
}
744-
}
745-
746-
return nil
747-
})
748-
}
749-
750-
return group.Wait()
751-
}
752-
753476
// updateClusterSize updates the cluster size in a race-condition safe manner.
754477
// It takes a new cluster size and the current keys being processed.
755478
// It returns a slice of keys that need to be fetched again.

cmd/node/benchmark_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,5 @@ func (m *mockedCloudStorage) ListFilesWithPrefix(_ context.Context, _, _ string,
158158
func (m *mockedCloudStorage) UploadReader(_ context.Context, _ string, _ io.Reader) (filemanager.UploadedFile, error) {
159159
return filemanager.UploadedFile{}, nil
160160
}
161+
162+
func (m *mockedCloudStorage) Delete(_ context.Context, _ []string) error { return nil }

cmd/operator/main.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/rudderlabs/keydb/client"
15+
"github.com/rudderlabs/keydb/internal/operator"
1516
"github.com/rudderlabs/rudder-go-kit/config"
1617
"github.com/rudderlabs/rudder-go-kit/logger"
1718
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
@@ -43,7 +44,7 @@ func run(ctx context.Context, cancel func(), conf *config.Config, log logger.Log
4344
Addresses: strings.Split(nodeAddresses, ","),
4445
TotalHashRanges: uint32(conf.GetInt("totalHashRanges", int(client.DefaultTotalHashRanges))),
4546
RetryCount: conf.GetInt("retryCount", client.DefaultRetryCount),
46-
RetryDelay: conf.GetDuration("retryDelay", 0, time.Nanosecond), // client.DefaultRetryDelay will be used
47+
RetryDelay: conf.GetDuration("retryDelay", 0, time.Second), // client.DefaultRetryDelay will be used
4748
}
4849
c, err := client.NewClient(clientConfig, log.Child("client"))
4950
if err != nil {
@@ -54,6 +55,21 @@ func run(ctx context.Context, cancel func(), conf *config.Config, log logger.Log
5455
log.Warnn("Failed to close client", obskit.Error(err))
5556
}
5657
}()
58+
opRetryDelay := conf.GetDuration("operatorRetryDelay", int64(client.DefaultRetryDelay), time.Second)
59+
op, err := operator.NewClient(operator.Config{
60+
Addresses: strings.Split(nodeAddresses, ","),
61+
TotalHashRanges: uint32(conf.GetInt("totalHashRanges", int(client.DefaultTotalHashRanges))),
62+
RetryCount: conf.GetInt("operatorRetryCount", client.DefaultRetryCount),
63+
RetryDelay: opRetryDelay,
64+
}, log.Child("operator"))
65+
if err != nil {
66+
return fmt.Errorf("failed to create operator: %w", err)
67+
}
68+
defer func() {
69+
if err := op.Close(); err != nil {
70+
log.Warnn("Failed to close operator", obskit.Error(err))
71+
}
72+
}()
5773

5874
log.Infon("Starting operator",
5975
logger.NewIntField("totalHashRanges", int64(clientConfig.TotalHashRanges)),
@@ -68,7 +84,7 @@ func run(ctx context.Context, cancel func(), conf *config.Config, log logger.Log
6884

6985
// Create and start HTTP server
7086
serverAddr := conf.GetString("serverAddr", ":8080")
71-
server := newHTTPServer(c, serverAddr)
87+
server := newHTTPServer(c, op, serverAddr, log)
7288

7389
// Start server in a goroutine
7490
serverErrCh := make(chan error, 1)

0 commit comments

Comments
 (0)