Skip to content

Commit 13c1e5e

Browse files
committed
Introduce KV mutex
1 parent 68274d1 commit 13c1e5e

File tree

6 files changed

+123
-6
lines changed

6 files changed

+123
-6
lines changed

GNUmakefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ generate:
1414
# Run acceptance tests
1515
.PHONY: testacc
1616
testacc:
17-
TF_ACC=1 go test ./... -v $(TESTARGS) -coverprofile=coverage.out -timeout 120m
17+
TF_ACC=1 go test ./... -v $(TESTARGS) -race -coverprofile=coverage.out -timeout 120m
1818

1919
# Run acceptance tests and show coverages
2020
.PHONY: testacc-cover

internal/mutex/kv.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package mutex
2+
3+
import (
4+
"log"
5+
"sync"
6+
)
7+
8+
// KV is a simple key/value store for arbitrary mutexes. It can be used to
9+
// serialize changes across arbitrary collaborators that share knowledge of the
10+
// keys they must serialize on.
11+
//
12+
type KV struct {
13+
lock sync.Mutex
14+
store map[string]*sync.Mutex
15+
}
16+
17+
// NewKV returns a properly initialized KV
18+
func NewKV() *KV {
19+
return &KV{
20+
store: make(map[string]*sync.Mutex),
21+
}
22+
}
23+
24+
// Lock the mutex for the given key. Caller is responsible for calling Unlock
25+
// for the same key
26+
func (m *KV) Lock(key string) {
27+
log.Printf("[DEBUG] Locking %q", key)
28+
m.get(key).Lock()
29+
log.Printf("[DEBUG] Locked %q", key)
30+
}
31+
32+
// Unlock the mutex for the given key. Caller must have called Lock for the same key first
33+
func (m *KV) Unlock(key string) {
34+
log.Printf("[DEBUG] Unlocking %q", key)
35+
m.get(key).Unlock()
36+
log.Printf("[DEBUG] Unlocked %q", key)
37+
}
38+
39+
// Returns a mutex for the given key, no guarantee of its lock status
40+
func (m *KV) get(key string) *sync.Mutex {
41+
m.lock.Lock()
42+
defer m.lock.Unlock()
43+
mutex, ok := m.store[key]
44+
if !ok {
45+
mutex = &sync.Mutex{}
46+
m.store[key] = mutex
47+
}
48+
return mutex
49+
}

internal/mutex/kv_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package mutex
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestKV(t *testing.T) {
10+
t.Parallel()
11+
12+
mutexKV := NewKV()
13+
start := time.Now()
14+
15+
wg := sync.WaitGroup{}
16+
wg.Add(1)
17+
go func() {
18+
defer wg.Done()
19+
20+
mutexKV.Lock("test")
21+
time.Sleep(100 * time.Millisecond)
22+
mutexKV.Unlock("test")
23+
}()
24+
25+
wg.Add(1)
26+
go func() {
27+
defer wg.Done()
28+
29+
mutexKV.Lock("test")
30+
time.Sleep(100 * time.Millisecond)
31+
mutexKV.Unlock("test")
32+
}()
33+
wg.Wait()
34+
35+
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
36+
t.Errorf("TestKV() elapsed time = %v, want %v", elapsed, 200*time.Millisecond)
37+
}
38+
}

internal/provider/provider.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package provider
22

33
import (
44
"context"
5-
65
"github.com/algolia/algoliasearch-client-go/v3/algolia/region"
76
"github.com/algolia/algoliasearch-client-go/v3/algolia/search"
87
"github.com/algolia/algoliasearch-client-go/v3/algolia/suggestions"
@@ -11,8 +10,12 @@ import (
1110
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/logging"
1211
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
1312
"github.com/hashicorp/terraform-provider-algolia/internal/algoliautil"
13+
"github.com/hashicorp/terraform-provider-algolia/internal/mutex"
1414
)
1515

16+
// Global Key/Value Mutex
17+
var mutexKV = mutex.NewKV()
18+
1619
// nolint: gochecknoinits
1720
func init() {
1821
schema.DescriptionKind = schema.StringMarkdown

internal/provider/resource_index.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package provider
22

33
import (
44
"context"
5+
"fmt"
56
"log"
67
"strconv"
78
"time"
@@ -583,8 +584,14 @@ func resourceIndexCreate(ctx context.Context, d *schema.ResourceData, m interfac
583584

584585
indexName := d.Get("name").(string)
585586

586-
if primaryIndexName, ok := d.GetOk("primary_index_name"); ok {
587-
primaryIndex := apiClient.searchClient.InitIndex(primaryIndexName.(string))
587+
if v, ok := d.GetOk("primary_index_name"); ok {
588+
primaryIndexName := v.(string)
589+
// Modifying the primary's replica setting on primary can cause problems if other replicas
590+
// are modifying it at the same time. Lock the primary until we're done in order to prevent that.
591+
mutexKV.Lock(algoliaIndexMutexKey(apiClient.appID, primaryIndexName))
592+
defer mutexKV.Unlock(algoliaIndexMutexKey(apiClient.appID, primaryIndexName))
593+
594+
primaryIndex := apiClient.searchClient.InitIndex(primaryIndexName)
588595
primaryIndexSettings, err := primaryIndex.GetSettings(ctx)
589596
if err != nil {
590597
return diag.FromErr(err)
@@ -653,8 +660,14 @@ func resourceIndexDelete(ctx context.Context, d *schema.ResourceData, m interfac
653660
apiClient := m.(*apiClient)
654661
indexName := d.Id()
655662

656-
if primaryIndexName, ok := d.GetOk("primary_index_name"); ok {
657-
primaryIndex := apiClient.searchClient.InitIndex(primaryIndexName.(string))
663+
if v, ok := d.GetOk("primary_index_name"); ok {
664+
primaryIndexName := v.(string)
665+
// Modifying the primary's replica setting on primary can cause problems if other replicas
666+
// are modifying it at the same time. Lock the primary until we're done in order to prevent that.
667+
mutexKV.Lock(algoliaIndexMutexKey(apiClient.appID, primaryIndexName))
668+
defer mutexKV.Unlock(algoliaIndexMutexKey(apiClient.appID, primaryIndexName))
669+
670+
primaryIndex := apiClient.searchClient.InitIndex(primaryIndexName)
658671
primaryIndexSettings, err := primaryIndex.GetSettings(ctx)
659672
if err != nil {
660673
return diag.FromErr(err)
@@ -1224,3 +1237,7 @@ func unmarshalAdvancedConfig(configured interface{}, settings *search.Settings,
12241237
}
12251238
}
12261239
}
1240+
1241+
func algoliaIndexMutexKey(appID string, indexName string) string {
1242+
return fmt.Sprintf("%s-algolia-index-%s", appID, indexName)
1243+
}

internal/provider/resource_virtual_index.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,11 @@ func resourceVirtualIndexCreate(ctx context.Context, d *schema.ResourceData, m i
583583
}
584584
}
585585
if !virtualReplicaExist {
586+
// Modifying the primary's replica setting on primary can cause problems if other replicas
587+
// are modifying it at the same time. Lock the primary until we're done in order to prevent that.
588+
mutexKV.Lock(algoliaIndexMutexKey(apiClient.appID, primaryIndexName))
589+
defer mutexKV.Unlock(algoliaIndexMutexKey(apiClient.appID, primaryIndexName))
590+
586591
newReplicas := append(primaryIndexSettings.Replicas.Get(), fmt.Sprintf("virtual(%s)", indexName))
587592
res, err := primaryIndex.SetSettings(search.Settings{
588593
Replicas: opt.Replicas(newReplicas...),
@@ -640,6 +645,11 @@ func resourceVirtualIndexDelete(ctx context.Context, d *schema.ResourceData, m i
640645
indexName := d.Id()
641646

642647
primaryIndexName := d.Get("primary_index_name").(string)
648+
// Modifying the primary's replica setting on primary can cause problems if other replicas
649+
// are modifying it at the same time. Lock the primary until we're done in order to prevent that.
650+
mutexKV.Lock(algoliaIndexMutexKey(apiClient.appID, primaryIndexName))
651+
defer mutexKV.Unlock(algoliaIndexMutexKey(apiClient.appID, primaryIndexName))
652+
643653
primaryIndex := apiClient.searchClient.InitIndex(primaryIndexName)
644654
primaryIndexSettings, err := primaryIndex.GetSettings(ctx)
645655
if err != nil {

0 commit comments

Comments
 (0)