Skip to content

Conversation

cpegeric
Copy link
Contributor

@cpegeric cpegeric commented Jul 7, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #22117

What this PR does / why we need it:

local index generated by for loop must be passed to go func as argument.
and fixed some race condition


PR Type

Bug fix


Description

  • Fix goroutine closure variable capture bug in concurrent loops

  • Pass loop index as parameter to avoid race conditions

  • Update 8 files with proper thread ID handling


Changes diagram

flowchart LR
  A["for loop with index i"] --> B["go func()"]
  B --> C["captures i by reference"]
  C --> D["race condition bug"]
  A --> E["go func(tid int)"]
  E --> F["passes i as parameter"]
  F --> G["fixed thread safety"]
Loading

Changes walkthrough 📝

Relevant files
Bug fix
7 files
fixedbytepool.go
Fix goroutine closure in partition spilling                           
+3/-3     
product_l2.go
Fix thread ID parameter in probe function                               
+3/-3     
build_test.go
Fix goroutine closure in HNSW build tests                               
+12/-12 
search.go
Fix thread ID in HNSW search goroutines                                   
+3/-3     
clusterer.go
Fix goroutine closure in Elkan's clustering algorithm       
+9/-9     
initializer.go
Fix thread ID in K-means++ initialization                               
+3/-3     
search.go
Fix goroutine closure in IVF-flat search                                 
+3/-3     

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • Copy link

    qodo-merge-pro bot commented Jul 7, 2025

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Race Condition

    The errs variable is accessed concurrently without proper synchronization. Multiple goroutines may write to it simultaneously causing data races.

    var errs error
    for i := 0; i < int(nspill); i++ {
    	wg.Add(1)
    
    	go func(tid int) {
    		defer wg.Done()
    		err := pool.partitions[lru[tid].id].Spill()
    		if err != nil {
    			errs = errors.Join(errs, err)
    		}
    Race Condition

    The failed variable is accessed concurrently without atomic operations or mutex protection, which can lead to race conditions when multiple goroutines increment it.

    failed := 0
    var wg2 sync.WaitGroup
    for j := 0; j < nthread; j++ {
    	wg2.Add(1)
    	go func(tid int) {
    		defer wg2.Done()
    		for i := 0; i < nitem; i++ {
    			key := int64(tid*nitem + i)
    			anykeys, distances, err := search.Search(nil, sample[key], vectorindex.RuntimeConfig{Limit: 10})
    			require.Nil(t, err)
    			keys, ok := anykeys.([]int64)
    			require.True(t, ok)
    			_ = distances
    			if keys[0] != key {
    				failed++

    @mergify mergify bot added the kind/bug Something isn't working label Jul 7, 2025
    Copy link

    qodo-merge-pro bot commented Jul 7, 2025

    PR Code Suggestions ✨

    Latest suggestions up to ecf2027

    CategorySuggestion                                                                                                                                    Impact
    Incremental [*]
    Add bounds checking for slice access

    Add bounds checking for the keys slice before accessing keys[0] to prevent
    potential index out of bounds panic when the search returns an empty result set.

    pkg/vectorindex/hnsw/build_test.go [298-319]

     var failed atomic.Int64
     var wg2 sync.WaitGroup
     for j := 0; j < nthread; j++ {
         wg2.Add(1)
         go func(tid int) {
             defer wg2.Done()
             for i := 0; i < nitem; i++ {
                 key := int64(tid*nitem + i)
                 anykeys, distances, err := search.Search(nil, sample[key], vectorindex.RuntimeConfig{Limit: 10})
                 require.Nil(t, err)
                 keys, ok := anykeys.([]int64)
                 require.True(t, ok)
                 _ = distances
    -            if keys[0] != key {
    +            if len(keys) > 0 && keys[0] != key {
                     failed.Add(1)
                     found, err := search.Contains(key)
                     require.Nil(t, err)
                     require.True(t, found)
                 }
             }
         }(j)
     }
    • Apply / Chat
    Suggestion importance[1-10]: 8

    __

    Why: This suggestion correctly identifies a potential panic from an out-of-bounds access on keys[0] if the search returns an empty slice, which is a critical bug in a test.

    Medium
    Eliminate redundant calculation in printf

    Avoid redundant calculation by reusing the already computed recall variable in
    the printf statement instead of recalculating the same expression.

    pkg/vectorindex/hnsw/build_test.go [328-329]

     recall := float32(nthread*nitem-int(failed.Load())) / float32(nthread*nitem)
    -fmt.Printf("Recall %f\n", float32(nthread*nitem-int(failed.Load()))/float32(nthread*nitem))
    +fmt.Printf("Recall %f\n", recall)
    • Apply / Chat
    Suggestion importance[1-10]: 4

    __

    Why: The suggestion correctly points out a redundant calculation and improves code readability and maintainability by reusing the recall variable.

    Low
    General
    Collect all concurrent errors properly

    The error handling only returns the first error from the channel, potentially
    losing other errors. Consider collecting all errors or using a more robust error
    aggregation approach.

    pkg/fulltext/fixedbytepool.go [436-438]

    -if len(errchan) > 0 {
    -	return <-errchan
    +close(errchan)
    +var errs []error
    +for err := range errchan {
    +	errs = append(errs, err)
    +}
    +if len(errs) > 0 {
    +	return errors.Join(errs...)
     }
    • Apply / Chat
    Suggestion importance[1-10]: 8

    __

    Why: The PR fixes a race condition but introduces a functional regression by only returning the first error, potentially hiding other important errors from concurrent operations.

    Medium
    Cache atomic load result

    The failed.Load() is called twice in the recall calculation, which could lead to
    inconsistent results if the value changes between calls. Store the loaded value
    in a variable.

    pkg/vectorindex/hnsw/build_test.go [328-329]

     var failed atomic.Int64
     ...
    -recall := float32(nthread*nitem-int(failed.Load())) / float32(nthread*nitem)
    -fmt.Printf("Recall %f\n", float32(nthread*nitem-int(failed.Load()))/float32(nthread*nitem))
    +failedCount := int(failed.Load())
    +recall := float32(nthread*nitem-failedCount) / float32(nthread*nitem)
    +fmt.Printf("Recall %f\n", float32(nthread*nitem-failedCount)/float32(nthread*nitem))

    [To ensure code accuracy, apply this suggestion manually]

    Suggestion importance[1-10]: 3

    __

    Why: While loading the atomic value once is good practice for readability and performance, there is no risk of inconsistency here as all goroutines modifying it have completed.

    Low
    • More

    Previous suggestions

    ✅ Suggestions up to commit fafc539
    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Fix concurrent access race condition
    Suggestion Impact:The suggestion was directly implemented. The commit changed the `failed` variable from a regular int to `atomic.Int64`, replaced `failed++` with `failed.Add(1)`, and updated the recall calculation to use `failed.Load()` to safely read the atomic value.

    code diff:

    -	failed := 0
    +	var failed atomic.Int64
     	var wg2 sync.WaitGroup
     	for j := 0; j < nthread; j++ {
     		wg2.Add(1)
    @@ -309,7 +309,7 @@
     				require.True(t, ok)
     				_ = distances
     				if keys[0] != key {
    -					failed++
    +					failed.Add(1)
     					found, err := search.Contains(key)
     					require.Nil(t, err)
     					require.True(t, found)
    @@ -325,8 +325,8 @@
     	require.Nil(t, err)
     	require.False(t, found)
     
    -	recall := float32(nthread*nitem-failed) / float32(nthread*nitem)
    -	fmt.Printf("Recall %f\n", float32(nthread*nitem-failed)/float32(nthread*nitem))
    +	recall := float32(nthread*nitem-int(failed.Load())) / float32(nthread*nitem)
    +	fmt.Printf("Recall %f\n", float32(nthread*nitem-int(failed.Load()))/float32(nthread*nitem))

    The failed variable is being modified concurrently by multiple goroutines
    without proper synchronization. This creates a race condition that can lead to
    incorrect failure counts.

    pkg/vectorindex/hnsw/build_test.go [297-319]

     // check recall
    -failed := 0
    +var failed atomic.Int64
     var wg2 sync.WaitGroup
     for j := 0; j < nthread; j++ {
         wg2.Add(1)
         go func(tid int) {
             defer wg2.Done()
             for i := 0; i < nitem; i++ {
                 key := int64(tid*nitem + i)
                 ...
                 if keys[0] != key {
    -                failed++
    +                failed.Add(1)
                     ...
                 }
             }
         }(j)
     }
    Suggestion importance[1-10]: 9

    __

    Why: The suggestion correctly identifies a data race on the failed variable inside a goroutine in TestBuildSingleThread and proposes using atomic operations, which is the correct fix and consistent with TestBuildMulti.

    High

    @cpegeric cpegeric changed the title BUG FIX: index generated by for loop didn't pass as go function argument leads to undetermined index BUG FIX: index generated by for loop didn't pass as go function argument leads to undetermined index and some race conditions Jul 7, 2025
    @mergify mergify bot merged commit 3a2a692 into matrixorigin:main Jul 7, 2025
    33 of 34 checks passed
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    kind/bug Something isn't working Review effort 2/5 size/S Denotes a PR that changes [10,99] lines
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    4 participants