Skip to content
Merged
10 changes: 8 additions & 2 deletions pkg/container/vector/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func GetArrayAt2[T types.RealNumbers](v *Vector, bs []types.Varlena, i int) []T
// WARNING: GetAny() return value with any type will cause memory escape to heap which will result in slow GC.
// If you know the actual type, better use the GetFixedAtWithTypeCheck() to get the values.
// Only use when you have no choice, e.g. you are dealing with column with any type that don't know in advanced.
func GetAny(vec *Vector, i int) any {
func GetAny(vec *Vector, i int, deepCopy bool) any {
switch vec.typ.Oid {
case types.T_bool:
return GetFixedAtNoTypeCheck[bool](vec, i)
Expand Down Expand Up @@ -425,7 +425,13 @@ func GetAny(vec *Vector, i int) any {
return GetFixedAtNoTypeCheck[types.Blockid](vec, i)
case types.T_char, types.T_varchar, types.T_binary, types.T_varbinary, types.T_json, types.T_blob, types.T_text,
types.T_array_float32, types.T_array_float64, types.T_datalink:
return vec.GetBytesAt(i)
ret := vec.GetBytesAt(i)
if deepCopy {
copied := make([]byte, len(ret))
copy(copied, ret)
ret = copied
}
return ret
}
return nil
}
Expand Down
41 changes: 21 additions & 20 deletions pkg/container/vector/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package vector

import (
"fmt"
"golang.org/x/exp/rand"
"slices"
"strings"
"testing"

"golang.org/x/exp/rand"

"github.com/matrixorigin/matrixone/pkg/common/bitmap"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
Expand Down Expand Up @@ -2414,7 +2415,7 @@ func TestGetAny(t *testing.T) {
v := NewVec(types.T_int8.ToType())
err := AppendFixed(v, int8(0), false, mp)
require.NoError(t, err)
s := GetAny(v, 0)
s := GetAny(v, 0, false)
v.Free(mp)
require.Equal(t, int8(0), s.(int8))
}
Expand All @@ -2423,7 +2424,7 @@ func TestGetAny(t *testing.T) {
w := NewVec(types.T_varchar.ToType())
err := AppendBytes(w, []byte("x"), false, mp)
require.NoError(t, err)
s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, []byte("x"), s.([]byte))
w.Free(mp)
}
Expand All @@ -2432,7 +2433,7 @@ func TestGetAny(t *testing.T) {
w := NewVec(types.T_bool.ToType())
err := AppendFixedList(w, []bool{true, false, true, false}, nil, mp)
require.NoError(t, err)
s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, true, s.(bool))

w.Free(mp)
Expand All @@ -2444,7 +2445,7 @@ func TestGetAny(t *testing.T) {
err := AppendFixedList(w, []int8{1, 2, 3, 4}, nil, mp)
require.NoError(t, err)

s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, int8(1), s.(int8))

w.Free(mp)
Expand All @@ -2456,7 +2457,7 @@ func TestGetAny(t *testing.T) {
err := AppendFixedList(w, []int16{1, 2, 3, 4}, nil, mp)
require.NoError(t, err)

s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, int16(1), s.(int16))

w.Free(mp)
Expand All @@ -2468,7 +2469,7 @@ func TestGetAny(t *testing.T) {
err := AppendFixedList(w, []int32{1, 2, 3, 4}, nil, mp)
require.NoError(t, err)

s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, int32(1), s.(int32))

w.Free(mp)
Expand All @@ -2480,7 +2481,7 @@ func TestGetAny(t *testing.T) {
err := AppendFixedList(w, []int64{1, 2, 3, 4}, nil, mp)
require.NoError(t, err)

s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, int64(1), s.(int64))

w.Free(mp)
Expand All @@ -2492,7 +2493,7 @@ func TestGetAny(t *testing.T) {
err := AppendFixedList(w, []uint8{1, 2, 3, 4}, nil, mp)
require.NoError(t, err)

s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, uint8(1), s.(uint8))

w.Free(mp)
Expand All @@ -2504,7 +2505,7 @@ func TestGetAny(t *testing.T) {
err := AppendFixedList(w, []uint16{1, 2, 3, 4}, nil, mp)
require.NoError(t, err)

s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, uint16(1), s.(uint16))

w.Free(mp)
Expand All @@ -2516,7 +2517,7 @@ func TestGetAny(t *testing.T) {
err := AppendFixedList(w, []uint32{1, 2, 3, 4}, nil, mp)
require.NoError(t, err)

s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, uint32(1), s.(uint32))

w.Free(mp)
Expand All @@ -2528,7 +2529,7 @@ func TestGetAny(t *testing.T) {
err := AppendFixedList(w, []uint64{1, 2, 3, 4}, nil, mp)
require.NoError(t, err)

s := GetAny(w, 0)
s := GetAny(w, 0, false)
require.Equal(t, uint64(1), s.(uint64))

w.Free(mp)
Expand All @@ -2540,7 +2541,7 @@ func TestGetAny(t *testing.T) {
err := AppendBytesList(v, [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4")}, nil, mp)
require.NoError(t, err)

s := GetAny(v, 0)
s := GetAny(v, 0, false)
require.Equal(t, []byte("1"), s.([]byte))

v.Free(mp)
Expand All @@ -2551,7 +2552,7 @@ func TestGetAny(t *testing.T) {
v := NewVec(types.T_time.ToType())
err := AppendFixedList(v, []types.Time{12 * 3600 * 1000 * 1000, 2, 3, 4}, nil, mp)
require.NoError(t, err)
s := GetAny(v, 0)
s := GetAny(v, 0, false)
require.Equal(t, types.Time(12*3600*1000*1000), s.(types.Time))
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
Expand All @@ -2561,7 +2562,7 @@ func TestGetAny(t *testing.T) {
v := NewVec(types.T_timestamp.ToType())
err := AppendFixedList(v, []types.Timestamp{10000000, 2, 3, 4}, nil, mp)
require.NoError(t, err)
s := GetAny(v, 0)
s := GetAny(v, 0, false)
require.Equal(t, types.Timestamp(10000000), s.(types.Timestamp))
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
Expand All @@ -2573,7 +2574,7 @@ func TestGetAny(t *testing.T) {
v := NewVec(typ)
err := AppendFixedList(v, []types.Decimal64{1234, 2000}, nil, mp)
require.NoError(t, err)
s := GetAny(v, 0)
s := GetAny(v, 0, false)
require.Equal(t, types.Decimal64(1234), s.(types.Decimal64))
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
Expand All @@ -2585,7 +2586,7 @@ func TestGetAny(t *testing.T) {
v := NewVec(typ)
err := AppendFixedList(v, []types.Decimal128{{B0_63: 1234, B64_127: 0}, {B0_63: 2345, B64_127: 0}}, nil, mp)
require.NoError(t, err)
s := GetAny(v, 0)
s := GetAny(v, 0, false)
require.Equal(t, types.Decimal128{B0_63: 1234, B64_127: 0}, s.(types.Decimal128))
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
Expand All @@ -2596,7 +2597,7 @@ func TestGetAny(t *testing.T) {
v := NewVec(types.T_uuid.ToType())
err := AppendFixedList(v, vs, nil, mp)
require.NoError(t, err)
s := GetAny(v, 0)
s := GetAny(v, 0, false)
require.Equal(t, "00000000-0000-0000-0000-000000000000", fmt.Sprint(s))
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
Expand All @@ -2607,7 +2608,7 @@ func TestGetAny(t *testing.T) {
v := NewVec(types.T_TS.ToType())
err := AppendFixedList(v, vs, nil, mp)
require.NoError(t, err)
s := GetAny(v, 0)
s := GetAny(v, 0, false)
require.Equal(t, types.TS(types.TS{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}), s)
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
Expand All @@ -2618,7 +2619,7 @@ func TestGetAny(t *testing.T) {
v := NewVec(types.T_Rowid.ToType())
err := AppendFixedList(v, vs, nil, mp)
require.NoError(t, err)
s := GetAny(v, 0)
s := GetAny(v, 0, false)
require.Equal(t, types.Rowid(types.Rowid{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}), s)
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
Expand Down
81 changes: 50 additions & 31 deletions pkg/sql/colexec/table_function/fulltext.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package table_function

import (
"context"
"encoding/json"
"fmt"
"sync"
Expand Down Expand Up @@ -225,20 +226,23 @@ func fulltextIndexScanPrepare(proc *process.Process, tableFunction *TableFunctio
}

// run SQL to get the (doc_id, word_index) of all patterns (words) in the search string
func runWordStats(u *fulltextState, proc *process.Process, s *fulltext.SearchAccum) (executor.Result, error) {

sql, err := fulltext.PatternToSql(s.Pattern, s.Mode, s.TblName, u.param.Parser, s.ScoreAlgo)
if err != nil {
return executor.Result{}, err
func runWordStats(
ctx context.Context,
u *fulltextState,
proc *process.Process,
s *fulltext.SearchAccum,
) (result executor.Result, err error) {

var sql string
if sql, err = fulltext.PatternToSql(
s.Pattern, s.Mode, s.TblName, u.param.Parser, s.ScoreAlgo,
); err != nil {
return
}

//logutil.Infof("SQL is %s", sql)
res, err := ft_runSql_streaming(proc, sql, u.stream_chan, u.errors)
if err != nil {
return executor.Result{}, err
}
result, err = ft_runSql_streaming(ctx, proc, sql, u.stream_chan, u.errors)

return res, nil
return
}

// evaluate the score for all document vectors in Agg hashtable.
Expand Down Expand Up @@ -320,7 +324,7 @@ func groupby(u *fulltextState, proc *process.Process, s *fulltext.SearchAccum) (

for i := 0; i < bat.RowCount(); i++ {
// doc_id any
doc_id := vector.GetAny(bat.Vecs[0], i)
doc_id := vector.GetAny(bat.Vecs[0], i, false)

bytes, ok := doc_id.([]byte)
if ok {
Expand Down Expand Up @@ -437,8 +441,15 @@ func runCountStar(proc *process.Process, s *fulltext.SearchAccum) (executor.Resu
return res, nil
}

func fulltextIndexMatch(u *fulltextState, proc *process.Process, tableFunction *TableFunction, srctbl, tblname, pattern string,
mode int64, scoreAlgo fulltext.FullTextScoreAlgo, bat *batch.Batch) (err error) {
func fulltextIndexMatch(
u *fulltextState,
proc *process.Process,
tableFunction *TableFunction,
srctbl, tblname, pattern string,
mode int64,
scoreAlgo fulltext.FullTextScoreAlgo,
bat *batch.Batch,
) (err error) {

opStats := tableFunction.OpAnalyzer.GetOpStats()

Expand Down Expand Up @@ -469,42 +480,50 @@ func fulltextIndexMatch(u *fulltextState, proc *process.Process, tableFunction *
// we should wait the goroutine exit completely here,
// even the SQL stream is done inside the `runWordStats`.
// or will be resulting in data race on the tableFunction.
waiter := sync.WaitGroup{}
waiter.Add(1)

defer func() {
waiter.Wait()
}()
var (
waiter sync.WaitGroup
ctx, cancel = context.WithCancelCause(proc.GetTopContext())
)
defer cancel(nil)

waiter.Add(1)
go func() {
defer func() {
waiter.Done()
}()
defer waiter.Done()

// get the statistic of search string ([]Pattern) and store in SearchAccum
res, err := runWordStats(u, proc, u.sacc)
if err != nil {
u.errors <- err
res, err2 := runWordStats(ctx, u, proc, u.sacc)
if err2 != nil {
u.errors <- err2
return
}

opStats.BackgroundQueries = append(opStats.BackgroundQueries, res.LogicalPlan)
}()

// get batch from SQL executor
sql_closed := false
for !sql_closed {
sql_closed, err = groupby(u, proc, u.sacc)
if err != nil {
return err
if sql_closed, err = groupby(u, proc, u.sacc); err != nil {
// notify the producer to stop the sql streaming
cancel(err)
break
}
}

// wait for the sql streaming to be closed. make sure all the remaining
// results in stream_chan are closed.
if !sql_closed {
for res := range u.stream_chan {
res.Close()
}
}

waiter.Wait()

/*
t2 := time.Now()
diff := t2.Sub(t1)
os.Stderr.WriteString(fmt.Sprintf("FULLTEXT: diff %v\n", diff))
os.Stderr.WriteString(u.mpool.String())
*/
return nil
return
}
10 changes: 8 additions & 2 deletions pkg/sql/colexec/table_function/fulltext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package table_function

import (
"context"
"math/rand"
"testing"

Expand Down Expand Up @@ -118,8 +119,13 @@ func fake_runSql(proc *process.Process, sql string) (executor.Result, error) {
return executor.Result{Mp: proc.Mp(), Batches: []*batch.Batch{makeCountBatchFT(proc)}}, nil
}

func fake_runSql_streaming(proc *process.Process, sql string, ch chan executor.Result, err_chan chan error) (executor.Result, error) {

func fake_runSql_streaming(
ctx context.Context,
proc *process.Process,
sql string,
ch chan executor.Result,
err_chan chan error,
) (executor.Result, error) {
defer close(ch)
res := executor.Result{Mp: proc.Mp(), Batches: []*batch.Batch{makeTextBatchFT(proc)}}
ch <- res
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/table_function/fulltext_tokenize.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (u *tokenizeState) start(tf *TableFunction, proc *process.Process, nthRow i
vlen := len(tf.ctr.argVecs)

idVec := tf.ctr.argVecs[0]
id := vector.GetAny(idVec, nthRow)
id := vector.GetAny(idVec, nthRow, true)

isnull := false
for i := 1; i < vlen; i++ {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/table_scan/table_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (tableScan *TableScan) Prepare(proc *process.Process) (err error) {
}
err = tableScan.PrepareProjection(proc)
if tableScan.ctr.buf == nil {
tableScan.ctr.buf = batch.NewWithSize(len(tableScan.Types))
tableScan.ctr.buf = batch.NewOffHeapWithSize(len(tableScan.Types))
tableScan.ctr.buf.Attrs = append(tableScan.ctr.buf.Attrs, tableScan.Attrs...)
for i := range tableScan.Types {
tableScan.ctr.buf.Vecs[i] = vector.NewVec(plan.MakeTypeByPlan2Type(tableScan.Types[i]))
tableScan.ctr.buf.Vecs[i] = vector.NewOffHeapVecWithType(plan.MakeTypeByPlan2Type(tableScan.Types[i]))
}
}
return
Expand Down
Loading
Loading