Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion pkg/frontend/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"fmt"
"regexp"
"strings"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/defines"
plan2 "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/txn/client"
)

func getOpAndToAccountId(
Expand Down Expand Up @@ -182,6 +185,8 @@ func handleCloneDatabase(
sortedFkTbls []string
fkTableMap map[string]*tableInfo
snapCondition string

snapshotTS int64
)

bh = ses.GetBackgroundExec(reqCtx)
Expand Down Expand Up @@ -235,14 +240,36 @@ func handleCloneDatabase(
return err
}

// consider the following example:
// (within a session)
// ...
// insert into t1 values (1) ---> commit ts (P2-L3)
// insert into t1 values (2) ---> commit ts (P2-L3)
// create table t2 clone t1 ---> the read snapshot ts is P2.
//
// limited by the format for the snapshot read TS, the logic TS is truncated,
// so in this example, the clone cannot read the newly inserted data.
//
// so we try to increase the txn physical ts here to make sure the snapshot TS
// the clone will get is greater than P2.
if snapshotTS, err = tryToIncreaseTxnPhysicalTS(
reqCtx, ses.proc.GetTxnOperator(),
); err != nil {
return err
}

ctx2 = defines.AttachAccountId(reqCtx, toAccountId)

cloneTable := func(dstDb, dstTbl, srcDb, srcTbl string) error {
sql := fmt.Sprintf(
"create table `%s`.`%s` clone `%s`.`%s`", dstDb, dstTbl, srcDb, srcTbl)
"create table `%s`.`%s` clone `%s`.`%s`",
dstDb, dstTbl, srcDb, srcTbl,
)

if snapCondition != "" {
sql = sql + " " + snapCondition
} else {
sql = sql + fmt.Sprintf(" {MO_TS = %d}", snapshotTS)
}

if err = bh.ExecRestore(ctx2, sql, opAccountId, toAccountId); err != nil {
Expand Down Expand Up @@ -318,3 +345,33 @@ func handleCloneDatabase(

return nil
}

func tryToIncreaseTxnPhysicalTS(
ctx context.Context, txnOp client.TxnOperator,
) (updatedPhysical int64, err error) {

curTxnPhysicalTS := txnOp.SnapshotTS().PhysicalTime

if ctx.Value(defines.TenantIDKey{}) == nil {
return curTxnPhysicalTS, nil
}

// a slight increase added to the physical to make sure
// the updated ts is greater than the old txn timestamp (physical + logic)
curTxnPhysicalTS += int64(time.Microsecond)
if err = txnOp.UpdateSnapshot(ctx, timestamp.Timestamp{
PhysicalTime: curTxnPhysicalTS,
}); err != nil {
return
}

updatedPhysical = txnOp.SnapshotTS().PhysicalTime
if updatedPhysical <= curTxnPhysicalTS {
return 0, moerr.NewInternalErrorNoCtxf("try to update the snapshot ts failed in clone database")
}

// return a nanosecond precision
updatedPhysical -= int64(time.Nanosecond)

return updatedPhysical, nil
}
19 changes: 15 additions & 4 deletions pkg/frontend/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,25 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
}
snapshotId = newUUid.String()

var (
snapshotTS int64
)

// refer to the `handleCloneDatabase`
if snapshotTS, err = tryToIncreaseTxnPhysicalTS(
ctx, ses.proc.GetTxnOperator(),
); err != nil {
return err
}

// 3. get database name , table name and objId according to the snapshot level
switch snapshotLevel {
case tree.SNAPSHOTLEVELCLUSTER:
sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotTS,
snapshotLevel.String(),
"",
"",
Expand Down Expand Up @@ -296,7 +307,7 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotTS,
snapshotLevel.String(),
snapshotForAccount,
"",
Expand Down Expand Up @@ -350,7 +361,7 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotTS,
snapshotLevel.String(),
currentAccount,
databaseName,
Expand Down Expand Up @@ -414,7 +425,7 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotTS,
snapshotLevel.String(),
currentAccount,
databaseName,
Expand Down
Loading