Skip to content

Commit 52844ed

Browse files
authored
make runtime filter work on non-PK column (#22045)
Previously runtime filter on non-PK column only take effect in single-concurrency queries (i.e., tpcc). This PR makes it work as expected. Approved by: @ouyuanning
1 parent 5c4f615 commit 52844ed

File tree

12 files changed

+628
-564
lines changed

12 files changed

+628
-564
lines changed

pkg/pb/pipeline/pipeline.pb.go

Lines changed: 486 additions & 421 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/colexec/filter/filter.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2222
"github.com/matrixorigin/matrixone/pkg/container/batch"
2323
"github.com/matrixorigin/matrixone/pkg/container/vector"
24-
"github.com/matrixorigin/matrixone/pkg/pb/plan"
2524
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
2625
"github.com/matrixorigin/matrixone/pkg/vm"
2726
"github.com/matrixorigin/matrixone/pkg/vm/process"
@@ -31,7 +30,7 @@ const opName = "filter"
3130

3231
func (filter *Filter) String(buf *bytes.Buffer) {
3332
buf.WriteString(opName)
34-
buf.WriteString(fmt.Sprintf("filter(%s)", filter.E))
33+
buf.WriteString(fmt.Sprintf("filter(%s)", filter.FilterExprs))
3534
}
3635

3736
func (filter *Filter) OpType() vm.OpType {
@@ -45,19 +44,30 @@ func (filter *Filter) Prepare(proc *process.Process) (err error) {
4544
filter.OpAnalyzer.Reset()
4645
}
4746

48-
if len(filter.ctr.executors) == 0 && filter.E != nil {
49-
filter.ctr.executors, err = colexec.NewExpressionExecutorsFromPlanExpressions(proc, colexec.SplitAndExprs([]*plan.Expr{filter.E}))
47+
if len(filter.ctr.executors) == 0 && filter.FilterExprs != nil {
48+
filter.ctr.executors, err = colexec.NewExpressionExecutorsFromPlanExpressions(proc, filter.FilterExprs)
49+
if err != nil {
50+
return
51+
}
52+
}
53+
54+
if len(filter.RuntimeFilterExprs) > 0 {
55+
filter.ctr.runtimeExecutors, err = colexec.NewExpressionExecutorsFromPlanExpressions(proc, filter.RuntimeFilterExprs)
56+
if err != nil {
57+
return
58+
}
5059
}
5160

5261
if filter.ctr.allExecutors == nil {
5362
filter.ctr.allExecutors = make([]colexec.ExpressionExecutor, 0, len(filter.ctr.runtimeExecutors)+len(filter.ctr.executors))
5463
} else {
5564
filter.ctr.allExecutors = filter.ctr.allExecutors[:0]
5665
}
66+
5767
filter.ctr.allExecutors = append(filter.ctr.allExecutors, filter.ctr.runtimeExecutors...)
5868
filter.ctr.allExecutors = append(filter.ctr.allExecutors, filter.ctr.executors...)
5969

60-
return err
70+
return
6171
}
6272

6373
func (filter *Filter) Call(proc *process.Process) (vm.CallResult, error) {

pkg/sql/colexec/filter/filter_test.go

Lines changed: 58 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ import (
1818
"context"
1919
"testing"
2020

21-
"github.com/stretchr/testify/require"
22-
2321
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2422
"github.com/matrixorigin/matrixone/pkg/container/batch"
2523
"github.com/matrixorigin/matrixone/pkg/container/types"
@@ -31,6 +29,7 @@ import (
3129
"github.com/matrixorigin/matrixone/pkg/testutil"
3230
"github.com/matrixorigin/matrixone/pkg/vm"
3331
"github.com/matrixorigin/matrixone/pkg/vm/process"
32+
"github.com/stretchr/testify/require"
3433
)
3534

3635
type filterTestCase struct {
@@ -45,9 +44,6 @@ func init() {
4544
boolType := types.T_bool.ToType()
4645
int32Type := types.T_int32.ToType()
4746

48-
fr0, _ := function.GetFunctionByName(context.TODO(), "and", []types.Type{boolType, boolType})
49-
fid0 := fr0.GetEncodedOverloadID()
50-
5147
fr1, _ := function.GetFunctionByName(context.TODO(), ">", []types.Type{int32Type, int32Type})
5248
fid1 := fr1.GetEncodedOverloadID()
5349

@@ -59,27 +55,29 @@ func init() {
5955
{
6056
proc: testutil.NewProcessWithMPool("", mpool.MustNewZero()),
6157
arg: &Filter{
62-
E: &plan.Expr{
63-
Typ: plan2.MakePlan2Type(&boolType),
64-
Expr: &plan.Expr_F{
65-
F: &plan.Function{
66-
Func: &plan.ObjectRef{
67-
ObjName: ">",
68-
Obj: fid1,
69-
},
58+
FilterExprs: []*plan.Expr{
59+
{
60+
Typ: plan2.MakePlan2Type(&boolType),
61+
Expr: &plan.Expr_F{
62+
F: &plan.Function{
63+
Func: &plan.ObjectRef{
64+
ObjName: ">",
65+
Obj: fid1,
66+
},
7067

71-
Args: []*plan.Expr{
72-
{
73-
Typ: plan2.MakePlan2Type(&int32Type),
74-
Expr: &plan.Expr_Col{
75-
Col: &plan.ColRef{
76-
RelPos: 0,
77-
ColPos: 0,
78-
Name: "a",
68+
Args: []*plan.Expr{
69+
{
70+
Typ: plan2.MakePlan2Type(&int32Type),
71+
Expr: &plan.Expr_Col{
72+
Col: &plan.ColRef{
73+
RelPos: 0,
74+
ColPos: 0,
75+
Name: "a",
76+
},
7977
},
8078
},
79+
makePlan2Int32ConstExprWithType(10),
8180
},
82-
makePlan2Int32ConstExprWithType(10),
8381
},
8482
},
8583
},
@@ -98,64 +96,53 @@ func init() {
9896
{
9997
proc: testutil.NewProcessWithMPool("", mpool.MustNewZero()),
10098
arg: &Filter{
101-
E: &plan.Expr{
102-
Typ: plan2.MakePlan2Type(&boolType),
103-
Expr: &plan.Expr_F{
104-
F: &plan.Function{
105-
Func: &plan.ObjectRef{
106-
ObjName: "and",
107-
Obj: fid0,
108-
},
109-
Args: []*plan.Expr{
110-
{
111-
Typ: plan2.MakePlan2Type(&boolType),
112-
Expr: &plan.Expr_F{
113-
F: &plan.Function{
114-
Func: &plan.ObjectRef{
115-
ObjName: ">",
116-
Obj: fid1,
117-
},
99+
FilterExprs: []*plan.Expr{
100+
{
101+
Typ: plan2.MakePlan2Type(&boolType),
102+
Expr: &plan.Expr_F{
103+
F: &plan.Function{
104+
Func: &plan.ObjectRef{
105+
ObjName: ">",
106+
Obj: fid1,
107+
},
118108

119-
Args: []*plan.Expr{
120-
{
121-
Typ: plan2.MakePlan2Type(&int32Type),
122-
Expr: &plan.Expr_Col{
123-
Col: &plan.ColRef{
124-
RelPos: 0,
125-
ColPos: 0,
126-
Name: "a",
127-
},
128-
},
129-
},
130-
makePlan2Int32ConstExprWithType(10),
109+
Args: []*plan.Expr{
110+
{
111+
Typ: plan2.MakePlan2Type(&int32Type),
112+
Expr: &plan.Expr_Col{
113+
Col: &plan.ColRef{
114+
RelPos: 0,
115+
ColPos: 0,
116+
Name: "a",
131117
},
132118
},
133119
},
120+
makePlan2Int32ConstExprWithType(10),
121+
},
122+
},
123+
},
124+
},
125+
{
126+
Typ: plan2.MakePlan2Type(&boolType),
127+
Expr: &plan.Expr_F{
128+
F: &plan.Function{
129+
Func: &plan.ObjectRef{
130+
ObjName: "<",
131+
Obj: fid2,
134132
},
135-
{
136-
Typ: plan2.MakePlan2Type(&boolType),
137-
Expr: &plan.Expr_F{
138-
F: &plan.Function{
139-
Func: &plan.ObjectRef{
140-
ObjName: "<",
141-
Obj: fid2,
142-
},
143133

144-
Args: []*plan.Expr{
145-
{
146-
Typ: plan2.MakePlan2Type(&int32Type),
147-
Expr: &plan.Expr_Col{
148-
Col: &plan.ColRef{
149-
RelPos: 0,
150-
ColPos: 1,
151-
Name: "b",
152-
},
153-
},
154-
},
155-
makePlan2Int32ConstExprWithType(40),
134+
Args: []*plan.Expr{
135+
{
136+
Typ: plan2.MakePlan2Type(&int32Type),
137+
Expr: &plan.Expr_Col{
138+
Col: &plan.ColRef{
139+
RelPos: 0,
140+
ColPos: 1,
141+
Name: "b",
156142
},
157143
},
158144
},
145+
makePlan2Int32ConstExprWithType(40),
159146
},
160147
},
161148
},

pkg/sql/colexec/filter/types.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ import (
2727
var _ vm.Operator = new(Filter)
2828

2929
type Filter struct {
30-
ctr container
31-
E *plan.Expr
32-
IsEnd bool
30+
ctr container
31+
FilterExprs []*plan.Expr
32+
RuntimeFilterExprs []*plan.Expr
33+
IsEnd bool
3334

3435
vm.OperatorBase
3536
}
@@ -74,15 +75,15 @@ type container struct {
7475
allExecutors []colexec.ExpressionExecutor // = executors + runtimeExecutor, do not free this executors
7576
}
7677

77-
func (filter *Filter) SetRuntimeExpr(proc *process.Process, exes []*plan.Expr) (err error) {
78-
filter.ctr.cleanRuntimeExecutor()
79-
filter.ctr.runtimeExecutors, err = colexec.NewExpressionExecutorsFromPlanExpressions(proc, exes)
78+
func (filter *Filter) SetRuntimeFilterExprs(proc *process.Process, exes []*plan.Expr) (err error) {
79+
filter.RuntimeFilterExprs = exes
8080
return
8181
}
8282

8383
func (filter *Filter) Reset(proc *process.Process, pipelineFailed bool, err error) {
8484
filter.ctr.resetExecutor()
8585
filter.ctr.cleanRuntimeExecutor()
86+
filter.RuntimeFilterExprs = nil
8687
}
8788

8889
func (filter *Filter) Free(proc *process.Process, pipelineFailed bool, err error) {
@@ -92,6 +93,7 @@ func (filter *Filter) Free(proc *process.Process, pipelineFailed bool, err error
9293
if filter.ctr.buf != nil {
9394
filter.ctr.buf.Clean(proc.Mp())
9495
}
96+
filter.RuntimeFilterExprs = nil
9597
}
9698

9799
func (filter *Filter) ExecProjection(proc *process.Process, input *batch.Batch) (*batch.Batch, error) {

pkg/sql/compile/compile.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2129,10 +2129,9 @@ func (c *Compile) compileRestrict(n *plan.Node, ss []*Scope) []*Scope {
21292129
return ss
21302130
}
21312131
currentFirstFlag := c.anal.isFirst
2132-
filterExpr := colexec.RewriteFilterExprList(plan2.DeepCopyExprList(n.FilterList))
21332132
var op *filter.Filter
21342133
for i := range ss {
2135-
op = constructRestrict(n, filterExpr)
2134+
op = constructRestrict(n, plan2.DeepCopyExprList(n.FilterList))
21362135
op.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag)
21372136
ss[i].setRootOperator(op)
21382137
}

pkg/sql/compile/operator.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,8 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
344344
case vm.Filter:
345345
t := sourceOp.(*filter.Filter)
346346
op := filter.NewArgument()
347-
op.E = t.E
347+
op.FilterExprs = t.FilterExprs
348+
op.RuntimeFilterExprs = t.RuntimeFilterExprs
348349
op.SetInfo(&info)
349350
return op
350351
case vm.Semi:
@@ -664,9 +665,9 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
664665
panic(fmt.Sprintf("unexpected instruction type '%d' to dup", sourceOp.OpType()))
665666
}
666667

667-
func constructRestrict(n *plan.Node, filterExpr *plan.Expr) *filter.Filter {
668+
func constructRestrict(n *plan.Node, filterExprs []*plan.Expr) *filter.Filter {
668669
op := filter.NewArgument()
669-
op.E = filterExpr
670+
op.FilterExprs = filterExprs
670671
op.IsEnd = n.IsEnd
671672
return op
672673
}

pkg/sql/compile/remoterun.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,8 @@ func convertToPipelineInstruction(op vm.Operator, proc *process.Process, ctx *sc
642642
case *projection.Projection:
643643
in.ProjectList = t.ProjectList
644644
case *filter.Filter:
645-
in.Filter = t.E
645+
in.Filters = t.FilterExprs
646+
in.RuntimeFilters = t.RuntimeFilterExprs
646647
case *semi.SemiJoin:
647648
in.SemiJoin = &pipeline.SemiJoin{
648649
Result: t.Result,
@@ -1155,7 +1156,8 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
11551156
op = arg
11561157
case vm.Filter:
11571158
arg := filter.NewArgument()
1158-
arg.E = opr.Filter
1159+
arg.FilterExprs = opr.Filters
1160+
arg.RuntimeFilterExprs = opr.RuntimeFilters
11591161
op = arg
11601162
case vm.Semi:
11611163
t := opr.GetSemiJoin()

pkg/sql/compile/remoterun_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ func Test_convertToVmInstruction(t *testing.T) {
299299
{Op: int32(vm.Product), Product: &pipeline.Product{}},
300300
{Op: int32(vm.ProductL2), ProductL2: &pipeline.ProductL2{}},
301301
{Op: int32(vm.Projection), ProjectList: []*plan.Expr{}},
302-
{Op: int32(vm.Filter), Filter: &plan.Expr{}},
302+
{Op: int32(vm.Filter), Filters: []*plan.Expr{}, RuntimeFilters: []*plan.Expr{}},
303303
{Op: int32(vm.Semi), SemiJoin: &pipeline.SemiJoin{}},
304304
{Op: int32(vm.Single), SingleJoin: &pipeline.SingleJoin{}},
305305
{Op: int32(vm.Top), Limit: plan.MakePlan2Int64ConstExprWithType(1)},

pkg/sql/compile/scope.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -703,9 +703,9 @@ func (s *Scope) handleRuntimeFilters(c *Compile, runtimeInExprList []*plan.Expr)
703703
panic("only support col in runtime filter's left child!")
704704
}
705705
if rfSpecs[i].NotOnPk {
706-
nonPkFilters = append(nonPkFilters, plan2.DeepCopyExpr(runtimeInExprList[i]))
706+
nonPkFilters = append(nonPkFilters, runtimeInExprList[i])
707707
} else {
708-
pkFilters = append(pkFilters, plan2.DeepCopyExpr(runtimeInExprList[i]))
708+
pkFilters = append(pkFilters, runtimeInExprList[i])
709709
}
710710
}
711711

@@ -720,10 +720,7 @@ func (s *Scope) handleRuntimeFilters(c *Compile, runtimeInExprList []*plan.Expr)
720720
if !ok {
721721
panic("missing instruction for runtime filter!")
722722
}
723-
err := arg.SetRuntimeExpr(s.Proc, nonPkFilters)
724-
if err != nil {
725-
return nil, err
726-
}
723+
arg.RuntimeFilterExprs = nonPkFilters
727724
}
728725

729726
// reset datasource
@@ -741,10 +738,7 @@ func (s *Scope) handleRuntimeFilters(c *Compile, runtimeInExprList []*plan.Expr)
741738
}
742739
}
743740

744-
newExprList := plan2.DeepCopyExprList(runtimeInExprList)
745-
newExprList = append(newExprList, s.DataSource.BlockFilterList...)
746-
747-
return newExprList, nil
741+
return append(runtimeInExprList, s.DataSource.BlockFilterList...), nil
748742
}
749743

750744
func (s *Scope) isTableScan() bool {

0 commit comments

Comments
 (0)