Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go.work
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go 1.24.5

use (
.
./v3/pkg/util
)
32 changes: 32 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05/go.mod h1:h0h5FBYpXThbvSfTqthw+0I4nmHnhTHkO5BoOHsBWqg=
cloud.google.com/go/compute v1.19.1 h1:am86mquDUgjGNWxiGn+5PGLbmgiWXlE/yNWpIpNvuXY=
github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg=
github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/anacrolix/dms v1.1.0/go.mod h1:msPKAoppoNRfrYplJqx63FZ+VipDZ4Xsj3KzIQxyU7k=
github.com/atotto/clipboard v0.1.2/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI=
github.com/billziss-gh/cgofuse v1.4.0/go.mod h1:LJjoaUojlVjgo5GQoEJTcJNqZJeRU0nCR84CyxKt2YM=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/hanwen/go-fuse/v2 v2.0.3/go.mod h1:0EQM6aH2ctVpvZ6a+onrQ/vaykxh2GH7hy3e13vzTUY=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nsf/termbox-go v0.0.0-20200418040025-38ba6e5628f1/go.mod h1:IuKpRQcYE1Tfu+oAQqaLisqDeXgjyyltCfsaoYN18NQ=
github.com/sevlyar/go-daemon v0.1.5/go.mod h1:6dJpPatBT9eUwM5VCw9Bt6CdX9Tk6UWvhW3MebLDRKE=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
goftp.io/server v0.4.0/go.mod h1:hFZeR656ErRt3ojMKt7H10vQ5nuWV1e0YeUTeorlR6k=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
2 changes: 1 addition & 1 deletion pkg/restapi/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type BackupService interface {

// RestoreService service interface for the REST API handlers.
type RestoreService interface {
GetTargetUnitsViews(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (restore.Target, []restore.Unit, []restore.View, error)
GetTargetUnitsViews(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (restore.Target, []restore.Unit, []restore.RestoredView, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (restore.Progress, error)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/restapi/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ type backupTarget struct {
type restoreTarget struct {
restore.Target
Units []restore.Unit
Views []restore.View
Views []restore.RestoredView
Size int64 // Total size of restored tables in bytes.
}

Expand Down
67 changes: 33 additions & 34 deletions pkg/service/restore/alternator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newAlternatorSchemaWorker(client *dynamodb.Client, schema backupspec.Altern

// isAlternatorKeyspace checks if given query.DescribedSchemaRow describes alternator schema.
func (sw *alternatorSchemaWorker) isAlternatorSchemaRow(cql query.DescribedSchemaRow) bool {
_, ok := sw.ksSchema[sw.sanitizeCQLKeyspace(cql)]
_, ok := sw.ksSchema[sanitizeSchemaRowName(cql.Keyspace)]
return ok
}

Expand Down Expand Up @@ -107,11 +107,6 @@ func (sw *alternatorSchemaWorker) tableSchemaToCreate(schema backupspec.Alternat
}
}

// sanitizeCQLKeyspace removes quotes from keyspace name in query.DescribedSchemaRow.
func (sw *alternatorSchemaWorker) sanitizeCQLKeyspace(cql query.DescribedSchemaRow) string {
return strings.TrimPrefix(strings.TrimSuffix(cql.Keyspace, "\""), "\"")
}

// alternatorInitViewsWorker contains tools needed for initializing restored alternator views.
// It basis its knowledge of alternator schema on the alternator schema described from the cluster.
type alternatorInitViewsWorker struct {
Expand Down Expand Up @@ -175,8 +170,8 @@ func (iw *alternatorInitViewsWorker) isAlternatorKeyspace(ks string) bool {
}

// initViews initializes alternator views translating them to Views.
func (iw *alternatorInitViewsWorker) initViews() ([]View, error) {
var views []View
func (iw *alternatorInitViewsWorker) initViews() ([]RestoredView, error) {
var views []RestoredView
for _, schema := range iw.ksSchema {
if schema.Describe == nil || schema.Describe.TableName == nil {
continue
Expand All @@ -195,23 +190,27 @@ func (iw *alternatorInitViewsWorker) initViews() ([]View, error) {
if err != nil {
return nil, err
}
views = append(views, View{
Keyspace: iw.cqlKeyspaceName(t),
View: iw.cqlGSIName(t, *gsi.IndexName),
Type: AlternatorGlobalSecondaryIndex,
BaseTable: t,
views = append(views, RestoredView{
View: View{
Keyspace: iw.cqlKeyspaceName(t),
Name: iw.cqlGSIName(t, *gsi.IndexName),
Type: AlternatorGlobalSecondaryIndex,
BaseTable: t,
},
CreateStmt: string(rawCreateStmt),
})
}
for _, lsi := range schema.Describe.LocalSecondaryIndexes {
if lsi.IndexName == nil {
continue
}
views = append(views, View{
Keyspace: iw.cqlKeyspaceName(t),
View: iw.cqlLSIName(t, *lsi.IndexName),
Type: AlternatorLocalSecondaryIndex,
BaseTable: t,
views = append(views, RestoredView{
View: View{
Keyspace: iw.cqlKeyspaceName(t),
Name: iw.cqlLSIName(t, *lsi.IndexName),
Type: AlternatorLocalSecondaryIndex,
BaseTable: t,
},
})
}
}
Expand Down Expand Up @@ -243,12 +242,12 @@ func (iw *alternatorInitViewsWorker) filterGSIAttr(desc types.TableDescription,
// It basis its knowledge of alternator schema on the initialized Views.
type alternatorDropViewsWorker struct {
alternatorWorker
views []View
views []RestoredView
client *dynamodb.Client
}

// newAlternatorDropViewsWorker creates new alternatorDropViewsWorker.
func newAlternatorDropViewsWorker(ctx context.Context, client *dynamodb.Client, views []View) (*alternatorDropViewsWorker, error) {
func newAlternatorDropViewsWorker(ctx context.Context, client *dynamodb.Client, views []RestoredView) (*alternatorDropViewsWorker, error) {
// Only existing views should be dropped
filteredViews, err := filterAlternatorViews(ctx, client, views, true)
if err != nil {
Expand All @@ -260,8 +259,8 @@ func newAlternatorDropViewsWorker(ctx context.Context, client *dynamodb.Client,
}, nil
}

// isAlternatorView checks if given View is an alternator one.
func (dw *alternatorDropViewsWorker) isAlternatorView(view View) bool {
// isAlternatorView checks if given view is an alternator one.
func (dw *alternatorDropViewsWorker) isAlternatorView(view RestoredView) bool {
return view.Type == AlternatorGlobalSecondaryIndex || view.Type == AlternatorLocalSecondaryIndex
}

Expand All @@ -283,10 +282,10 @@ func (dw *alternatorDropViewsWorker) dropViews(ctx context.Context) error {
return nil
}

func (dw *alternatorDropViewsWorker) viewToDeleteUpdate(view View) (types.GlobalSecondaryIndexUpdate, error) {
func (dw *alternatorDropViewsWorker) viewToDeleteUpdate(view RestoredView) (types.GlobalSecondaryIndexUpdate, error) {
switch view.Type {
case AlternatorGlobalSecondaryIndex:
altView, err := dw.alternatorGSIName(view.BaseTable, view.View)
altView, err := dw.alternatorGSIName(view.BaseTable, view.Name)
if err != nil {
return types.GlobalSecondaryIndexUpdate{}, err
}
Expand All @@ -303,12 +302,12 @@ func (dw *alternatorDropViewsWorker) viewToDeleteUpdate(view View) (types.Global
// alternatorCreateViewsWorker contains tools needed for creating restored alternator views.
// It basis its knowledge of alternator schema on the initialized Views.
type alternatorCreateViewsWorker struct {
views []View
views []RestoredView
client *dynamodb.Client
}

// newAlternatorCreateViewsWorker creates new alternatorCreateViewsWorker.
func newAlternatorCreateViewsWorker(ctx context.Context, client *dynamodb.Client, views []View) (*alternatorCreateViewsWorker, error) {
func newAlternatorCreateViewsWorker(ctx context.Context, client *dynamodb.Client, views []RestoredView) (*alternatorCreateViewsWorker, error) {
// Only non-existing views should be created
filteredViews, err := filterAlternatorViews(ctx, client, views, false)
if err != nil {
Expand All @@ -320,8 +319,8 @@ func newAlternatorCreateViewsWorker(ctx context.Context, client *dynamodb.Client
}, nil
}

// isAlternatorView checks if given View is an alternator one.
func (cw *alternatorCreateViewsWorker) isAlternatorView(view View) bool {
// isAlternatorView checks if given view is an alternator one.
func (cw *alternatorCreateViewsWorker) isAlternatorView(view RestoredView) bool {
return view.Type == AlternatorGlobalSecondaryIndex || view.Type == AlternatorLocalSecondaryIndex
}

Expand All @@ -340,7 +339,7 @@ func (cw *alternatorCreateViewsWorker) createViews(ctx context.Context) error {
return nil
}

func (cw *alternatorCreateViewsWorker) viewToCreateStmt(view View) (*dynamodb.UpdateTableInput, error) {
func (cw *alternatorCreateViewsWorker) viewToCreateStmt(view RestoredView) (*dynamodb.UpdateTableInput, error) {
switch view.Type {
case AlternatorGlobalSecondaryIndex:
var update dynamodb.UpdateTableInput
Expand All @@ -356,12 +355,12 @@ func (cw *alternatorCreateViewsWorker) viewToCreateStmt(view View) (*dynamodb.Up
// filterAlternatorViews is a helper function used for initialization of alternatorDropViewsWorker and alternatorCreateViewsWorker.
// The exist parameter specifies if we want to filter for existing or non-existing views in the current cluster schema.
// Since we don't drop and re-create alternator LSIs, we only need to filter for GSIs.
func filterAlternatorViews(ctx context.Context, client *dynamodb.Client, views []View, exist bool) ([]View, error) {
func filterAlternatorViews(ctx context.Context, client *dynamodb.Client, views []RestoredView, exist bool) ([]RestoredView, error) {
if client == nil {
if ok := slices.ContainsFunc(views, func(v View) bool { return v.Type == AlternatorGlobalSecondaryIndex }); ok {
if ok := slices.ContainsFunc(views, func(v RestoredView) bool { return v.Type == AlternatorGlobalSecondaryIndex }); ok {
return nil, errors.New("uninitialized alternator client with non-empty alternator schema")
}
return []View{}, nil
return []RestoredView{}, nil
}

schema, err := backup.GetAlternatorSchema(ctx, client)
Expand Down Expand Up @@ -389,12 +388,12 @@ func filterAlternatorViews(ctx context.Context, client *dynamodb.Client, views [
}
}

var filteredViews []View
var filteredViews []RestoredView
for _, v := range views {
if v.Type != AlternatorGlobalSecondaryIndex {
continue
}
if _, ok := existingViews[viewKey{t: v.BaseTable, v: v.View}]; ok == exist {
if _, ok := existingViews[viewKey{t: v.BaseTable, v: v.Name}]; ok == exist {
filteredViews = append(filteredViews, v)
}
}
Expand Down
19 changes: 13 additions & 6 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type Run struct {
RepairTaskID uuid.UUID // task ID of the automated post-restore repair
// Cache that's initialized once for entire task
Units []Unit
Views []View
Views []RestoredView
}

// Unit represents restored keyspace and its tables with their size.
Expand Down Expand Up @@ -225,27 +225,34 @@ const (
AlternatorLocalSecondaryIndex ViewType = "AlternatorLocalSecondaryIndex"
)

// View represents statement used for recreating restored (dropped) views.
// View describes different types of views.
// It primarily uses CQL names, because those names are also used for
// interacting with views with scylla rest api.
type View struct {
Keyspace string `json:"keyspace" db:"keyspace_name"` // CQL keyspace name. There is no ks abstraction in alternator.
View string `json:"view" db:"view_name"` // CQL view name. Different from alternator name.
Name string `json:"name" db:"view_name"` // CQL view name. Different from alternator name.
Type ViewType `json:"type" db:"view_type"`
BaseTable string `json:"base_table"` // CQL name of the base table. Same as alternator name.
}

// RestoredView represents statement used for recreating restored (dropped) views.
// It primarily uses CQL names, because those names are also used for
// interacting with views with scylla rest api.
type RestoredView struct {
View
// For cql views, CreateStmt is the text encoded cql statement.
// For alternator GSIs, CreateStmt is the json encoded dynamodb.UpdateTableInput.
// For alternator LSIs, CreateStmt is empty, as we don't drop and re-create them.
CreateStmt string `json:"create_stmt"`
BuildStatus scyllaclient.ViewBuildStatus `json:"status"`
}

func (t View) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error) {
func (t RestoredView) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error) {
f := gocqlx.DefaultMapper.FieldByName(reflect.ValueOf(t), name)
return gocql.Marshal(info, f.Interface())
}

func (t *View) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error {
func (t *RestoredView) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error {
f := gocqlx.DefaultMapper.FieldByName(reflect.ValueOf(t), name)
return gocql.Unmarshal(info, data, f.Addr().Interface())
}
Expand Down Expand Up @@ -313,7 +320,7 @@ type Progress struct {
SnapshotTag string `json:"snapshot_tag"`
Keyspaces []KeyspaceProgress `json:"keyspaces,omitempty"`
Hosts []HostProgress `json:"hosts,omitempty"`
Views []View `json:"views,omitempty"`
Views []RestoredView `json:"views,omitempty"`
Stage Stage `json:"stage"`
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/service/restore/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ func TestAggregateProgress(t *testing.T) {
},
},
},
Views: []View{
Views: []RestoredView{
{
Keyspace: ks,
View: "mv",
Type: MaterializedView,
BaseTable: tab1,
View: View{
Keyspace: ks,
Name: "mv",
Type: MaterializedView,
BaseTable: tab1,
},
CreateStmt: "CREATE",
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI
}

// GetTargetUnitsViews returns all information necessary for task validation and --dry-run.
func (s *Service) GetTargetUnitsViews(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (Target, []Unit, []View, error) {
func (s *Service) GetTargetUnitsViews(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (Target, []Unit, []RestoredView, error) {
w, err := s.newWorker(ctx, clusterID)
if err != nil {
return Target{}, nil, nil, errors.Wrap(err, "create worker")
Expand Down
14 changes: 7 additions & 7 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
var goldenViews []View
var goldenViews []RestoredView
if err := json.Unmarshal(b, &goldenViews); err != nil {
t.Fatal(err)
}
Expand All @@ -493,17 +493,17 @@ func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) {
}

if goldenViews == nil {
goldenViews = make([]View, 0)
goldenViews = make([]RestoredView, 0)
}
if views == nil {
views = make([]View, 0)
views = make([]RestoredView, 0)
}
if diff := cmp.Diff(goldenViews, views,
cmpopts.SortSlices(func(a, b View) bool { return a.Keyspace+a.View < b.Keyspace+b.View }),
cmpopts.IgnoreSliceElements(func(v View) bool {
return slices.Contains(ignoredViews, v.View)
cmpopts.SortSlices(func(a, b RestoredView) bool { return a.Keyspace+a.Name < b.Keyspace+b.Name }),
cmpopts.IgnoreSliceElements(func(v RestoredView) bool {
return slices.Contains(ignoredViews, v.Name)
}),
cmpopts.IgnoreFields(View{}, "CreateStmt")); diff != "" {
cmpopts.IgnoreFields(RestoredView{}, "CreateStmt")); diff != "" {
t.Fatal(tc.views, diff)
}
})
Expand Down
Loading
Loading