Skip to content

Commit 9bfb8b1

Browse files
authored
make sure the snapshot read TS greater than the TXN logic TS when create snapshot or clone. (#22252)
consider the following example: (within a session) ... insert into t1 values (1) ---> commit ts (P2-L3) insert into t1 values (2) ---> commit ts (P2-L4) create table t2 clone t1 ---> the read snapshot ts is P2. limited by the format for the snapshot read TS, the logic TS is truncated, so in this example, the clone cannot read the newly inserted data. so we try to increase the txn physical ts here to make sure the snapshot TS the clone will get is greater than P2. the same issue exists when create snapshot. Approved by: @qingxinhome
1 parent e0c9a88 commit 9bfb8b1

File tree

2 files changed

+73
-5
lines changed

2 files changed

+73
-5
lines changed

pkg/frontend/clone.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ import (
1919
"fmt"
2020
"regexp"
2121
"strings"
22+
"time"
2223

2324
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2425
"github.com/matrixorigin/matrixone/pkg/defines"
2526
plan2 "github.com/matrixorigin/matrixone/pkg/pb/plan"
27+
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
2628
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
2729
"github.com/matrixorigin/matrixone/pkg/sql/plan"
30+
"github.com/matrixorigin/matrixone/pkg/txn/client"
2831
)
2932

3033
func getOpAndToAccountId(
@@ -182,6 +185,8 @@ func handleCloneDatabase(
182185
sortedFkTbls []string
183186
fkTableMap map[string]*tableInfo
184187
snapCondition string
188+
189+
snapshotTS int64
185190
)
186191

187192
bh = ses.GetBackgroundExec(reqCtx)
@@ -235,14 +240,36 @@ func handleCloneDatabase(
235240
return err
236241
}
237242

243+
// consider the following example:
244+
// (within a session)
245+
// ...
246+
// insert into t1 values (1) ---> commit ts (P2-L3)
247+
// insert into t1 values (2) ---> commit ts (P2-L3)
248+
// create table t2 clone t1 ---> the read snapshot ts is P2.
249+
//
250+
// limited by the format for the snapshot read TS, the logic TS is truncated,
251+
// so in this example, the clone cannot read the newly inserted data.
252+
//
253+
// so we try to increase the txn physical ts here to make sure the snapshot TS
254+
// the clone will get is greater than P2.
255+
if snapshotTS, err = tryToIncreaseTxnPhysicalTS(
256+
reqCtx, ses.proc.GetTxnOperator(),
257+
); err != nil {
258+
return err
259+
}
260+
238261
ctx2 = defines.AttachAccountId(reqCtx, toAccountId)
239262

240263
cloneTable := func(dstDb, dstTbl, srcDb, srcTbl string) error {
241264
sql := fmt.Sprintf(
242-
"create table `%s`.`%s` clone `%s`.`%s`", dstDb, dstTbl, srcDb, srcTbl)
265+
"create table `%s`.`%s` clone `%s`.`%s`",
266+
dstDb, dstTbl, srcDb, srcTbl,
267+
)
243268

244269
if snapCondition != "" {
245270
sql = sql + " " + snapCondition
271+
} else {
272+
sql = sql + fmt.Sprintf(" {MO_TS = %d}", snapshotTS)
246273
}
247274

248275
if err = bh.ExecRestore(ctx2, sql, opAccountId, toAccountId); err != nil {
@@ -318,3 +345,33 @@ func handleCloneDatabase(
318345

319346
return nil
320347
}
348+
349+
func tryToIncreaseTxnPhysicalTS(
350+
ctx context.Context, txnOp client.TxnOperator,
351+
) (updatedPhysical int64, err error) {
352+
353+
curTxnPhysicalTS := txnOp.SnapshotTS().PhysicalTime
354+
355+
if ctx.Value(defines.TenantIDKey{}) == nil {
356+
return curTxnPhysicalTS, nil
357+
}
358+
359+
// a slight increase added to the physical to make sure
360+
// the updated ts is greater than the old txn timestamp (physical + logic)
361+
curTxnPhysicalTS += int64(time.Microsecond)
362+
if err = txnOp.UpdateSnapshot(ctx, timestamp.Timestamp{
363+
PhysicalTime: curTxnPhysicalTS,
364+
}); err != nil {
365+
return
366+
}
367+
368+
updatedPhysical = txnOp.SnapshotTS().PhysicalTime
369+
if updatedPhysical <= curTxnPhysicalTS {
370+
return 0, moerr.NewInternalErrorNoCtxf("try to update the snapshot ts failed in clone database")
371+
}
372+
373+
// return a nanosecond precision
374+
updatedPhysical -= int64(time.Nanosecond)
375+
376+
return updatedPhysical, nil
377+
}

pkg/frontend/snapshot.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,25 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
228228
}
229229
snapshotId = newUUid.String()
230230

231+
var (
232+
snapshotTS int64
233+
)
234+
235+
// refer to the `handleCloneDatabase`
236+
if snapshotTS, err = tryToIncreaseTxnPhysicalTS(
237+
ctx, ses.proc.GetTxnOperator(),
238+
); err != nil {
239+
return err
240+
}
241+
231242
// 3. get database name , table name and objId according to the snapshot level
232243
switch snapshotLevel {
233244
case tree.SNAPSHOTLEVELCLUSTER:
234245
sql, err = getSqlForCreateSnapshot(
235246
ctx,
236247
snapshotId,
237248
snapshotName,
238-
time.Now().UTC().UnixNano(),
249+
snapshotTS,
239250
snapshotLevel.String(),
240251
"",
241252
"",
@@ -296,7 +307,7 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
296307
ctx,
297308
snapshotId,
298309
snapshotName,
299-
time.Now().UTC().UnixNano(),
310+
snapshotTS,
300311
snapshotLevel.String(),
301312
snapshotForAccount,
302313
"",
@@ -350,7 +361,7 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
350361
ctx,
351362
snapshotId,
352363
snapshotName,
353-
time.Now().UTC().UnixNano(),
364+
snapshotTS,
354365
snapshotLevel.String(),
355366
currentAccount,
356367
databaseName,
@@ -414,7 +425,7 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
414425
ctx,
415426
snapshotId,
416427
snapshotName,
417-
time.Now().UTC().UnixNano(),
428+
snapshotTS,
418429
snapshotLevel.String(),
419430
currentAccount,
420431
databaseName,

0 commit comments

Comments
 (0)