Skip to content

Conversation

XuPeng-SH
Copy link
Contributor

@XuPeng-SH XuPeng-SH commented Sep 2, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #15191

What this PR does / why we need it:

  1. use off-heap memory as the table_scan buffer and output operator
  2. fix some potential memory leaks

PR Type

Enhancement, Bug fix


Description

  • Improve IVF flat index query performance with off-heap memory

  • Fix potential memory leaks in streaming SQL operations

  • Add proper context cancellation and resource cleanup

  • Enhance error handling in vector index operations


Diagram Walkthrough

flowchart LR
  A["Table Scan"] --> B["Off-heap Memory"]
  C["SQL Streaming"] --> D["Context Cancellation"]
  E["Vector Index"] --> F["Resource Cleanup"]
  B --> G["Performance Improvement"]
  D --> H["Memory Leak Prevention"]
  F --> H
Loading

File Walkthrough

Relevant files
Enhancement
5 files
fulltext.go
Add context cancellation and resource cleanup                       
+49/-30 
table_scan.go
Use off-heap memory for table scan buffer                               
+2/-2     
search.go
Add context cancellation and resource cleanup                       
+60/-34 
search.go
Improve search performance and memory management                 
+86/-37 
sqlexec.go
Add context parameter to streaming SQL execution                 
+10/-3   
Tests
3 files
fulltext_test.go
Update test function signature with context                           
+8/-2     
search_test.go
Update mock function with context parameter                           
+8/-1     
search_test.go
Update test functions and fix cancellation test                   
+36/-6   
Bug fix
1 files
sql_executor.go
Fix memory handling and context cancellation                         
+6/-2     

@XuPeng-SH XuPeng-SH requested a review from cpegeric September 2, 2025 15:04
Copy link

qodo-merge-pro bot commented Sep 2, 2025

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Issue

Switching from Dup to Clone for batch copying changes semantics and ownership; verify Clone deep-copies all vectors and metadata needed for downstream consumers and that off-heap/on-heap compatibility is maintained to avoid use-after-free or unexpected memory pressure.

err = c.Compile(
	exec.ctx,
	pn,
	func(bat *batch.Batch, crs *perfcounter.CounterSet) error {
		if bat != nil {
			// the bat is valid only in current method. So we need copy data.
			// FIXME: add a custom streaming apply handler to consume readed data. Now
			// our current internal sql will never read too much data.
			rows, err := bat.Clone(exec.s.mp, true)
			if err != nil {
				return err
			}
			if streaming {
				stream_result := executor.NewResult(exec.s.mp)
				for len(stream_chan) == cap(stream_chan) {
					select {
					case <-proc.Ctx.Done():
Resource Cleanup

Ensure the temporary file is always closed and removed on all error paths after starting streaming and checksum validation; confirm no goroutine leaks and that cancelation reliably drains and closes stream_chan to prevent leaks.

func (idx *HnswSearchIndex) LoadIndex(
	proc *process.Process,
	idxcfg vectorindex.IndexConfig,
	tblcfg vectorindex.IndexTableConfig,
	nthread int64,
) (err error) {

	var (
		fp          *os.File
		stream_chan = make(chan executor.Result, 2)
		error_chan  = make(chan error)
	)

	// create tempfile for writing
	if fp, err = os.CreateTemp("", "hnswindx"); err != nil {
		return
	}
	defer func() {
		fp.Close()
		os.Remove(fp.Name())
	}()

	if err = fallocate.Fallocate(fp, 0, idx.Filesize); err != nil {
		return
	}

	// run streaming sql
	sql := fmt.Sprintf(
		"SELECT chunk_id, data from `%s`.`%s` WHERE index_id = '%s'",
		tblcfg.DbName, tblcfg.IndexTable, idx.Id,
	)

	ctx, cancel := context.WithCancelCause(proc.GetTopContext())
	defer cancel(nil)

	go func() {
		if _, err2 := runSql_streaming(
			ctx, proc, sql, stream_chan, error_chan,
		); err2 != nil {
			error_chan <- err2
			return
		}
	}()

	// incremental load from database
	sql_closed := false
	for !sql_closed {
		if sql_closed, err = idx.loadChunk(proc, stream_chan, error_chan, fp); err != nil {
			// notify the producer to stop the sql streaming
			cancel(err)
			break
		}
	}

	// wait for the sql streaming to be closed. make sure all the remaining
	// results in stream_chan are closed.
	if !sql_closed {
		for res := range stream_chan {
			res.Close()
		}
	}

	if err != nil {
		return
	}

	// check checksum
	var chksum string
	if chksum, err = vectorindex.CheckSum(fp.Name()); err != nil {
		return
	}
	if chksum != idx.Checksum {
		return moerr.NewInternalError(ctx, "Checksum mismatch with the index file")
	}

	var usearchidx *usearch.Index
	if usearchidx, err = usearch.NewIndex(idxcfg.Usearch); err != nil {
		return
	}

	if err = usearchidx.ChangeThreadsSearch(uint(nthread)); err != nil {
		return
	}

	if err = usearchidx.Load(fp.Name()); err != nil {
		return
	}

	idx.Index = usearchidx

	return
}
Concurrency/Cancelation

New cancelation flow introduces shared stream channel consumption by multiple workers; validate that error/cancel paths cannot double-close or leave results unclosed, and that cause propagation and draining logic works under high contention without deadlock.

		tblcfg.DbName, tblcfg.EntriesTable,
		catalog.SystemSI_IVFFLAT_TblCol_Entries_version,
		idx.Version,
		catalog.SystemSI_IVFFLAT_TblCol_Entries_id,
		instr,
	)
	//os.Stderr.WriteString(sql)

	var (
		wg          sync.WaitGroup
		ctx, cancel = context.WithCancelCause(proc.GetTopContext())
	)
	defer cancel(nil)

	wg.Add(1)
	go func() {
		defer wg.Done()
		if _, err2 := runSql_streaming(
			ctx, proc, sql, stream_chan, error_chan,
		); err2 != nil {
			// consumer notify the producer to stop the sql streaming
			cancel(err2)
			return
		}
	}()

	var (
		heap = vectorindex.NewSearchResultSafeHeap(int(rt.Probe * 1000))
	)

	for n := int64(0); n < nthread; n++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			// brute force search with selected centroids
			var (
				streamClosed = false
				err2         error
			)
			for !streamClosed {
				if streamClosed, err2 = idx.searchEntries(
					ctx, proc, query, distfn, heap, stream_chan, error_chan,
				); err2 != nil {
					// consumer notify the producer to stop the sql streaming
					cancel(err2)
					break
				}
			}
			// in case stream is not closed and there are some errors during
			// searchEntries, we need to wait for the stream_chan to be closed.
			// Otherwise, some remaining results in stream_chan will not be
			// closed (memory leak).
			// For example:
			// 1. Producer send one batch into stream_chan
			// 2. Consumer fetch one batch from stream_chan and then encounter an error
			// 3. Producer is still sending batches into stream_chan
			// 4. Consumer break the consume loop and then return
			// 5. Some remaining results in stream_chan will not be closed (memory leak).

			// Right Steps:
			// 1. Producer send one batch into stream_chan
			// 2. Consumer fetch one batch from stream_chan and then encounter an error
			// 3. Producer send one batch into stream_chan
			// 4. Consumer notify the producer to stop streaming
			// 5. Consumer fetch the remaining batches from stream_chan until the stream_chan is closed.
			if !streamClosed {
				for res := range stream_chan {
					res.Close()
				}
			}
		}()
	}

	wg.Wait()

	// check local context cancelled
	select {
	case <-proc.Ctx.Done():
		err = proc.Ctx.Err()
		return
	case <-ctx.Done():
		err = context.Cause(ctx)
		return
	default:
	}

	distances = make([]float64, 0, rt.Limit)
	var (
		resid = make([]any, 0, rt.Limit)
		n     = heap.Len()
	)
	for i := 0; i < int(rt.Limit) && i < n; i++ {
		srif := heap.Pop()
		sr, ok := srif.(*vectorindex.SearchResultAnyKey)
		if !ok {
			err = moerr.NewInternalError(proc.Ctx, "ivf search: heap return key is not any")
			return
		}
		resid = append(resid, sr.Id)
		distances = append(distances, sr.Distance)
	}

	return resid, distances, nil
}

@matrix-meow matrix-meow added size/XL Denotes a PR that changes [1000, 1999] lines and removed size/L Denotes a PR that changes [500,999] lines labels Sep 5, 2025
@XuPeng-SH
Copy link
Contributor Author

@mergify refresh

Copy link
Contributor

mergify bot commented Sep 5, 2025

refresh

✅ Pull request refreshed

@mergify mergify bot merged commit 997fdc9 into matrixorigin:main Sep 5, 2025
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement Review effort 3/5 size/XL Denotes a PR that changes [1000, 1999] lines
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants