Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/defines/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ type SourceScanResKey struct{}

type IgnoreForeignKey struct{}

type AlterCopyDedupOpt struct{}
type AlterCopyOpt struct{}

// Determine if now is a bg sql.
type BgKey struct{}
Expand Down
2,483 changes: 1,628 additions & 855 deletions pkg/pb/plan/plan.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/multi_update/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (update *MultiUpdate) insert_secondary_index_table(
attrs = append(attrs, col.Name)
}
}

ctr.insertBuf[tableIndex] = batch.New(attrs)
for insertIdx, inputIdx := range updateCtx.InsertCols {
ctr.insertBuf[tableIndex].Vecs[insertIdx] = vector.NewVec(*inputBatch.Vecs[inputIdx].GetType())
Expand All @@ -131,6 +132,7 @@ func (update *MultiUpdate) insert_table(
inputBatch *batch.Batch,
insertBatch *batch.Batch,
) (err error) {

insertBatch.CleanOnlyData()
for insertIdx, inputIdx := range updateCtx.InsertCols {
err = insertBatch.Vecs[insertIdx].UnionBatch(inputBatch.Vecs[inputIdx], 0, inputBatch.Vecs[inputIdx].Length(), nil, proc.GetMPool())
Expand Down
15 changes: 12 additions & 3 deletions pkg/sql/colexec/table_clone/table_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/reuse"
Expand Down Expand Up @@ -137,9 +138,17 @@ func (tc *TableClone) Prepare(proc *process.Process) error {
// the src table is a publication
tc.Ctx.SrcCtx = defines.AttachAccountId(tc.Ctx.SrcCtx, uint32(tc.Ctx.SrcObjDef.PubInfo.TenantId))

} else if tc.Ctx.ScanSnapshot != nil && tc.Ctx.ScanSnapshot.Tenant != nil {
// the source data may be coming from a different account.
tc.Ctx.SrcCtx = defines.AttachAccountId(tc.Ctx.SrcCtx, tc.Ctx.ScanSnapshot.Tenant.TenantID)
} else if tc.Ctx.ScanSnapshot != nil {
if tc.Ctx.ScanSnapshot.Tenant != nil {
// the source data may be coming from a different account.
tc.Ctx.SrcCtx = defines.AttachAccountId(tc.Ctx.SrcCtx, tc.Ctx.ScanSnapshot.Tenant.TenantID)
}

// without setting this scan ts, we could read the newly created table !!!
if tc.Ctx.ScanSnapshot.TS != nil {
txnOp = proc.GetTxnOperator().CloneSnapshotOp(*tc.Ctx.ScanSnapshot.TS)
proc.SetCloneTxnOperator(txnOp)
}
}

txnOp = proc.GetCloneTxnOperator()
Expand Down
133 changes: 116 additions & 17 deletions pkg/sql/compile/alter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package compile
import (
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/common/reuse"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/table_clone"
"slices"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -141,8 +144,8 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
return err
}
opt := executor.StatementOption{}
if qry.DedupOpt.SkipPkDedup || len(qry.DedupOpt.SkipUniqueIdxDedup) > 0 {
opt = opt.WithAlterCopyDedupOpt(qry.DedupOpt)
if qry.Options.SkipPkDedup || len(qry.Options.SkipUniqueIdxDedup) > 0 {
opt = opt.WithAlterCopyOpt(qry.Options)
}
// 4. copy the original table data to the temporary replica table
err = c.runSqlWithOptions(qry.InsertTmpDataSql, opt)
Expand All @@ -156,7 +159,25 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
return err
}

// 5. drop original table
//5. obtain relation for new tables
newRel, err := dbSource.Relation(c.proc.Ctx, qry.CopyTableDef.Name, nil)
if err != nil {
c.proc.Error(c.proc.Ctx, "obtain new relation for copy table for alter table",
zap.String("databaseName", dbName),
zap.String("origin tableName", qry.GetTableDef().Name),
zap.String("copy table name", qry.CopyTableDef.Name),
zap.Error(err))
return err
}

//6. copy on writing unaffected index table
if err = cowUnaffectedIndexes(
c, dbName, qry.AffectedCols, newRel, qry.TableDef, nil,
); err != nil {
return err
}

// 7. drop original table
dropSql := fmt.Sprintf("drop table `%s`.`%s`", dbName, tblName)
if err := c.runSqlWithOptions(
dropSql,
Expand All @@ -170,7 +191,7 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
return err
}

// 5.1 delete all index objects of the table in mo_catalog.mo_indexes
// 7.1 delete all index objects of the table in mo_catalog.mo_indexes
if qry.Database != catalog.MO_CATALOG && qry.TableDef.Name != catalog.MO_INDEXES {
if qry.GetTableDef().Pkey != nil || len(qry.GetTableDef().Indexes) > 0 {
deleteSql := fmt.Sprintf(
Expand All @@ -190,20 +211,9 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
}
}

//6. obtain relation for new tables
newRel, err := dbSource.Relation(c.proc.Ctx, qry.CopyTableDef.Name, nil)
if err != nil {
c.proc.Error(c.proc.Ctx, "obtain new relation for copy table for alter table",
zap.String("databaseName", dbName),
zap.String("origin tableName", qry.GetTableDef().Name),
zap.String("copy table name", qry.CopyTableDef.Name),
zap.Error(err))
return err
}

newId := newRel.GetTableID(c.proc.Ctx)
//-------------------------------------------------------------------------
// 7. rename temporary replica table into the original table(Table Id remains unchanged)
// 8. rename temporary replica table into the original table(Table Id remains unchanged)
copyTblName := qry.CopyTableDef.Name
req := api.NewRenameTableReq(
newRel.GetDBID(c.proc.Ctx),
Expand All @@ -226,7 +236,7 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
}
//--------------------------------------------------------------------------------------------------------------
{
// 8. invoke reindex for the new table, if it contains ivf index.
// 9. invoke reindex for the new table, if it contains ivf index.
multiTableIndexes := make(map[string]*MultiTableIndex)
newTableDef := newRel.CopyTableDef(c.proc.Ctx)
extra := newRel.GetExtraInfo()
Expand Down Expand Up @@ -545,3 +555,92 @@ func notifyParentTableFkTableIdChange(c *Compile, fkey *plan.ForeignKeyDef, oldT
}
return fatherRelation.UpdateConstraint(c.proc.Ctx, oldCt)
}

func cowUnaffectedIndexes(
c *Compile,
dbName string,
affectedCols []string,
newRel engine.Relation,
oriTblDef *plan.TableDef,
cloneSnapshot *plan.Snapshot,
) (err error) {

var (
clone *table_clone.TableClone

oriIdxTblDef *plan.TableDef
oriIdxObjRef *plan.ObjectRef

newTblDef = newRel.GetTableDef(c.proc.Ctx)

oriIdxColNameToTblName = make(map[string]string)
newIdxTColNameToTblName = make(map[string]string)
)

releaseClone := func() {
if clone != nil {
clone.Free(c.proc, false, err)
reuse.Free[table_clone.TableClone](clone, nil)
clone = nil
}
}

defer func() {
releaseClone()
}()

for _, idxTbl := range oriTblDef.Indexes {
if slices.Index(affectedCols, idxTbl.IndexName) != -1 {
continue
}

oriIdxColNameToTblName[idxTbl.IndexName] = idxTbl.IndexTableName
}

for _, idxTbl := range newTblDef.Indexes {
newIdxTColNameToTblName[idxTbl.IndexName] = idxTbl.IndexTableName
}

cctx := compilerContext{
ctx: c.proc.Ctx,
defaultDB: dbName,
engine: c.e,
proc: c.proc,
}

for colName, oriIdxTblName := range oriIdxColNameToTblName {
newIdxTblName, ok := newIdxTColNameToTblName[colName]
if !ok {
continue
}

oriIdxObjRef, oriIdxTblDef, err = cctx.Resolve(dbName, oriIdxTblName, cloneSnapshot)

clonePlan := plan.CloneTable{
CreateTable: nil,
ScanSnapshot: cloneSnapshot,
SrcTableDef: oriIdxTblDef,
SrcObjDef: oriIdxObjRef,
DstDatabaseName: dbName,
DstTableName: newIdxTblName,
}

if clone, err = constructTableClone(c, &clonePlan); err != nil {
return err
}

if err = clone.Prepare(c.proc); err != nil {
releaseClone()
return err
}

if _, err = clone.Call(c.proc); err != nil {
releaseClone()
return err
}

releaseClone()
}

return nil
}
16 changes: 5 additions & 11 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4926,26 +4926,20 @@ func (c *Compile) compileTableClone(
err error
s1 *Scope

nodes []engine.Node
cloneQry = pn.GetDdl().Query
node engine.Node
clonePlan = pn.GetDdl().GetCloneTable()
)

nodes, err = c.generateNodes(cloneQry.Nodes[0])
if err != nil {
return nil, err
}
node = getEngineNode(c)

copyOp, err := constructTableClone(c, cloneQry.Nodes[0])
copyOp, err := constructTableClone(c, clonePlan)
if err != nil {
return nil, err
}

s1 = newScope(TableClone)
s1.NodeInfo = nodes[0]
s1.NodeInfo = node
s1.TxnOffset = c.TxnOffset
s1.DataSource = &Source{
node: cloneQry.Nodes[0],
}
s1.Plan = pn

s1.Proc = c.proc.NewNoContextChildProc(0)
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2931,8 +2931,13 @@ func (s *Scope) TableClone(c *Compile) error {
err error
)

if err = s.CreateTable(c); err != nil {
return err
clonePlan := s.Plan.GetDdl().GetCloneTable()

if clonePlan.CreateTable != nil {
s.Plan = clonePlan.CreateTable
if err = s.CreateTable(c); err != nil {
return err
}
}

return s.Run(c)
Expand Down
29 changes: 16 additions & 13 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2376,19 +2376,19 @@ func constructPostDml(n *plan.Node, eg engine.Engine) *postdml.PostDml {

func constructTableClone(
c *Compile,
n *plan.Node,
clonePlan *plan.CloneTable,
) (*table_clone.TableClone, error) {

metaCopy := table_clone.NewTableClone()

metaCopy.Ctx = &table_clone.TableCloneCtx{
Eng: c.e,
SrcTblDef: n.TableDef,
SrcObjDef: n.ObjRef,
SrcTblDef: clonePlan.SrcTableDef,
SrcObjDef: clonePlan.SrcObjDef,

ScanSnapshot: n.ScanSnapshot,
DstTblName: n.InsertCtx.TableDef.Name,
DstDatabaseName: n.InsertCtx.TableDef.DbName,
ScanSnapshot: clonePlan.ScanSnapshot,
DstTblName: clonePlan.DstTableName,
DstDatabaseName: clonePlan.DstDatabaseName,
}

var (
Expand All @@ -2401,7 +2401,7 @@ func constructTableClone(
hasAutoIncr bool
)

for _, colDef := range n.TableDef.Cols {
for _, colDef := range clonePlan.SrcTableDef.Cols {
if colDef.Typ.AutoIncr {
hasAutoIncr = true
break
Expand All @@ -2413,17 +2413,20 @@ func constructTableClone(
}

sql = fmt.Sprintf(
"select col_index, offset from mo_catalog.mo_increment_columns where table_id = %d", n.TableDef.TblId)
"select col_index, offset from mo_catalog.mo_increment_columns where table_id = %d",
clonePlan.SrcTableDef.TblId,
)

if n.ScanSnapshot != nil {
if n.ScanSnapshot.Tenant != nil {
account = n.ScanSnapshot.Tenant.TenantID
if clonePlan.ScanSnapshot != nil {
if clonePlan.ScanSnapshot.Tenant != nil {
account = clonePlan.ScanSnapshot.Tenant.TenantID
}

if n.ScanSnapshot.TS != nil {
if clonePlan.ScanSnapshot.TS != nil {
sql = fmt.Sprintf(
"select col_index, offset from mo_catalog.mo_increment_columns {MO_TS = %d} where table_id = %d",
n.ScanSnapshot.TS.PhysicalTime, n.TableDef.TblId)
clonePlan.ScanSnapshot.TS.PhysicalTime, clonePlan.SrcTableDef.TblId,
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/sql_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (exec *txnExecutor) Exec(

if v := statementOption.AlterCopyDedupOpt(); v != nil {
exec.ctx = context.WithValue(exec.ctx,
defines.AlterCopyDedupOpt{}, v)
defines.AlterCopyOpt{}, v)
}

receiveAt := time.Now()
Expand Down
26 changes: 18 additions & 8 deletions pkg/sql/plan/bind_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,22 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
}
}
} else {
var skipPkDedup bool
var skipUniqueIdxDedup map[string]bool
if v := builder.compCtx.GetContext().Value(defines.AlterCopyDedupOpt{}); v != nil {
dedupOpt := v.(*plan.AlterCopyDedupOpt)
if dedupOpt.TargetTableName == tableDef.Name {

var (
option *plan.AlterCopyOpt
skipPkDedup bool
skipUniqueIdxDedup map[string]bool
)

if v := builder.compCtx.GetContext().Value(defines.AlterCopyOpt{}); v != nil {
option = v.(*plan.AlterCopyOpt)
if option.TargetTableName == tableDef.Name {
logutil.Info("alter copy dedup exec",
zap.String("tableDef", tableDef.Name),
zap.Any("dedupOpt", dedupOpt))
skipPkDedup = dedupOpt.SkipPkDedup
skipUniqueIdxDedup = dedupOpt.SkipUniqueIdxDedup
zap.Any("option", option),
)
skipPkDedup = option.SkipPkDedup
skipUniqueIdxDedup = option.SkipUniqueIdxDedup
}
}

Expand Down Expand Up @@ -382,6 +388,10 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
continue
}

if option != nil && option.SkipIndexesCopy[idxDef.IndexName] {
continue
}

idxObjRefs[i], idxTableDefs[i], err = builder.compCtx.ResolveIndexTableByRef(objRef, idxDef.IndexTableName, bindCtx.snapshot)
if err != nil {
return 0, err
Expand Down
Loading
Loading