Skip to content

Commit ecfaeaa

Browse files
bbrkstorcolvin
authored andcommitted
CBG-2838: Rely on index 'IF NOT EXISTS' and idempotent deferred index builds fo… (#7161)
1 parent 0772bfe commit ecfaeaa

12 files changed

+357
-299
lines changed

base/cluster_n1ql.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ var _ N1QLStore = &ClusterOnlyN1QLStore{}
3030
// ClusterOnlyN1QLStore instances. Anticipates future refactoring of N1QLStore to differentiate between
3131
// collection-scoped and non-collection-scoped operations.
3232
type ClusterOnlyN1QLStore struct {
33-
cluster *gocb.Cluster
34-
bucketName string // User to build keyspace for query when not otherwise set
35-
scopeName string // Used to build keyspace for query when not otherwise set
36-
collectionName string // Used to build keyspace for query when not otherwise set
37-
supportsCollections bool
33+
cluster *gocb.Cluster
34+
bucketName string // User to build keyspace for query when not otherwise set
35+
scopeName string // Used to build keyspace for query when not otherwise set
36+
collectionName string // Used to build keyspace for query when not otherwise set
37+
supportsCollections bool
38+
supportsIfNotExistsInDDL bool // 7.1.0+ MB-38737
3839
}
3940

4041
func NewClusterOnlyN1QLStore(cluster *gocb.Cluster, bucketName, scopeName, collectionName string) (*ClusterOnlyN1QLStore, error) {
@@ -51,11 +52,25 @@ func NewClusterOnlyN1QLStore(cluster *gocb.Cluster, bucketName, scopeName, colle
5152
return nil, err
5253
}
5354
clusterOnlyn1qlStore.supportsCollections = isMinimumVersion(uint64(major), uint64(minor), 7, 0)
55+
clusterOnlyn1qlStore.supportsIfNotExistsInDDL = isMinimumVersion(uint64(major), uint64(minor), 7, 1)
5456

5557
return clusterOnlyn1qlStore, nil
5658

5759
}
5860

61+
func (cl *ClusterOnlyN1QLStore) IsSupported(feature sgbucket.BucketStoreFeature) bool {
62+
switch feature {
63+
case sgbucket.BucketStoreFeatureN1ql:
64+
return true
65+
case sgbucket.BucketStoreFeatureCollections:
66+
return cl.supportsCollections
67+
case sgbucket.BucketStoreFeatureN1qlIfNotExistsDDL:
68+
return cl.supportsIfNotExistsInDDL
69+
default:
70+
return false
71+
}
72+
}
73+
5974
func (cl *ClusterOnlyN1QLStore) GetName() string {
6075
return cl.bucketName
6176
}
@@ -72,6 +87,10 @@ func (cl *ClusterOnlyN1QLStore) CreateIndex(ctx context.Context, indexName strin
7287
return CreateIndex(ctx, cl, indexName, expression, filterExpression, options)
7388
}
7489

90+
func (cl *ClusterOnlyN1QLStore) CreateIndexIfNotExists(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
91+
return CreateIndexIfNotExists(ctx, cl, indexName, expression, filterExpression, options)
92+
}
93+
7594
func (cl *ClusterOnlyN1QLStore) CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error {
7695
return CreatePrimaryIndex(ctx, cl, indexName, options)
7796
}

base/collection.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
244244
return false
245245
}
246246
return len(agent.N1qlEps()) > 0
247+
case sgbucket.BucketStoreFeatureN1qlIfNotExistsDDL:
248+
return isMinimumVersion(b.clusterCompatMajorVersion, b.clusterCompatMinorVersion, 7, 1)
247249
// added in Couchbase Server 6.6
248250
case sgbucket.BucketStoreFeatureCreateDeletedWithXattr:
249251
status, err := b.bucket.Internal().CapabilityStatus(gocb.CapabilityCreateAsDeleted)

base/collection_n1ql.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ func (c *Collection) CreateIndex(ctx context.Context, indexName string, expressi
133133
return CreateIndex(ctx, c, indexName, expression, filterExpression, options)
134134
}
135135

136+
func (c *Collection) CreateIndexIfNotExists(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
137+
return CreateIndexIfNotExists(ctx, c, indexName, expression, filterExpression, options)
138+
}
139+
136140
func (c *Collection) CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error {
137141
return CreatePrimaryIndex(ctx, c, indexName, options)
138142
}

base/collection_n1ql_common.go

Lines changed: 78 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package base
1212

1313
import (
1414
"context"
15+
"errors"
1516
"fmt"
1617
"slices"
1718
"strings"
@@ -51,6 +52,7 @@ type N1QLStore interface {
5152
GetName() string
5253
BuildDeferredIndexes(ctx context.Context, indexSet []string) error
5354
CreateIndex(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error
55+
CreateIndexIfNotExists(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error
5456
CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error
5557
DropIndex(ctx context.Context, indexName string) error
5658
ExplainQuery(ctx context.Context, statement string, params map[string]interface{}) (plan map[string]interface{}, err error)
@@ -75,6 +77,8 @@ type N1QLStore interface {
7577

7678
// waitUntilQueryServiceReady waits until the query service is ready to accept requests
7779
waitUntilQueryServiceReady(timeout time.Duration) error
80+
81+
sgbucket.BucketStoreFeatureIsSupported
7882
}
7983

8084
func ExplainQuery(ctx context.Context, store N1QLStore, statement string, params map[string]interface{}) (plan map[string]interface{}, err error) {
@@ -119,7 +123,7 @@ func (im *indexManager) GetAllIndexes() ([]gocb.QueryIndex, error) {
119123
return im.cluster.GetAllIndexes(im.bucketName, opts)
120124
}
121125

122-
// CreateIndex issues a CREATE INDEX query in the current bucket, using the form:
126+
// CreateIndex issues a CREATE INDEX query in the N1QLStore keyspace, using the form:
123127
//
124128
// CREATE INDEX indexName ON bucket.Name(expression) WHERE filterExpression WITH options
125129
//
@@ -128,31 +132,60 @@ func (im *indexManager) GetAllIndexes() ([]gocb.QueryIndex, error) {
128132
// CreateIndex("myIndex", "field1, field2, nested.field", "field1 > 0", N1qlIndexOptions{numReplica:1})
129133
// CREATE INDEX myIndex on myBucket(field1, field2, nested.field) WHERE field1 > 0 WITH {"numReplica":1}
130134
func CreateIndex(ctx context.Context, store N1QLStore, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
131-
createStatement := fmt.Sprintf("CREATE INDEX `%s` ON %s(%s)", indexName, store.EscapedKeyspace(), expression)
135+
return createIndex(ctx, store, indexName, expression, filterExpression, false, options)
136+
}
137+
138+
// CreateIndexIfNotExists issues a CREATE INDEX query in the N1QLStore keyspace, using the form:
139+
//
140+
// CREATE INDEX indexName ON bucket.Name(expression) IF NOT EXISTS WHERE filterExpression WITH options
141+
//
142+
// Sample usage with resulting statement:
143+
//
144+
// CreateIndex("myIndex", "field1, field2, nested.field", "field1 > 0", N1qlIndexOptions{numReplica:1})
145+
// CREATE INDEX myIndex on myBucket(field1, field2, nested.field) WHERE field1 > 0 WITH {"numReplica":1}
146+
func CreateIndexIfNotExists(ctx context.Context, store N1QLStore, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
147+
return createIndex(ctx, store, indexName, expression, filterExpression, true, options)
148+
}
149+
150+
// createIndex is a common function for CreateIndex and CreateIndexIfNotExists
151+
func createIndex(ctx context.Context, store N1QLStore, indexName string, expression string, filterExpression string, ifNotExists bool, options *N1qlIndexOptions) error {
152+
var ifNotExistsStr string
153+
// Server 7.1+ - we can still safely _not_ use this when it's not available, because we have equivalent error handling inside this function to swallow `ErrAlreadyExists`.
154+
// Would still prefer to use it when we can, to guard us against future error string changes, which is why we do both conditionally.
155+
if ifNotExists && store.IsSupported(sgbucket.BucketStoreFeatureN1qlIfNotExistsDDL) {
156+
ifNotExistsStr = " IF NOT EXISTS"
157+
}
132158

133159
// Add filter expression, when present
160+
var filterExpressionStr string
134161
if filterExpression != "" {
135-
createStatement = fmt.Sprintf("%s WHERE %s", createStatement, filterExpression)
162+
filterExpressionStr = " WHERE " + filterExpression
136163
}
137164

138-
// Replace any KeyspaceQueryToken references in the index expression
139-
createStatement = strings.Replace(createStatement, KeyspaceQueryToken, store.EscapedKeyspace(), -1)
165+
createStatement := fmt.Sprintf("CREATE INDEX `%s`%s ON %s(%s)%s", indexName, ifNotExistsStr, store.EscapedKeyspace(), expression, filterExpressionStr)
140166

141-
createErr := createIndex(ctx, store, indexName, createStatement, options)
142-
if createErr != nil {
143-
if strings.Contains(createErr.Error(), "already exists") || strings.Contains(createErr.Error(), "duplicate index name") {
144-
return ErrAlreadyExists
167+
// Replace any KeyspaceQueryToken references in the index expression
168+
createStatement = strings.ReplaceAll(createStatement, KeyspaceQueryToken, store.EscapedKeyspace())
169+
createErr := createIndexFromStatement(ctx, store, indexName, createStatement, options)
170+
if IsIndexAlreadyExistsError(createErr) || IsCreateDuplicateIndexError(createErr) {
171+
// Pre-7.1 compatibility: Swallow this error like Server does when specifying `IF NOT EXISTS`
172+
if ifNotExists {
173+
return nil
145174
}
175+
return ErrAlreadyExists
146176
}
147177
return createErr
148178
}
149179

150180
func CreatePrimaryIndex(ctx context.Context, store N1QLStore, indexName string, options *N1qlIndexOptions) error {
151181
createStatement := fmt.Sprintf("CREATE PRIMARY INDEX `%s` ON %s", indexName, store.EscapedKeyspace())
152-
return createIndex(ctx, store, indexName, createStatement, options)
182+
return createIndexFromStatement(ctx, store, indexName, createStatement, options)
153183
}
154184

155-
func createIndex(ctx context.Context, store N1QLStore, indexName string, createStatement string, options *N1qlIndexOptions) error {
185+
// ErrIndexBackgroundRetry is returned when an index creation operation returned an error but just needs to wait for a server-side readiness or retry.
186+
var ErrIndexBackgroundRetry = errors.New("Indexer error - waiting for server background retry")
187+
188+
func createIndexFromStatement(ctx context.Context, store N1QLStore, indexName string, createStatement string, options *N1qlIndexOptions) error {
156189

157190
if options != nil {
158191
withClause, marshalErr := JSONMarshal(options)
@@ -162,23 +195,16 @@ func createIndex(ctx context.Context, store N1QLStore, indexName string, createS
162195
createStatement = fmt.Sprintf(`%s with %s`, createStatement, withClause)
163196
}
164197

165-
DebugfCtx(ctx, KeyQuery, "Attempting to create index using statement: [%s]", UD(createStatement))
198+
TracefCtx(ctx, KeyQuery, "Attempting to create index %q using statement: [%s]", indexName, UD(createStatement))
166199

167200
err := store.executeStatement(createStatement)
168201
if err == nil {
169202
return nil
170203
}
171204

172-
if IsIndexerRetryIndexError(err) {
173-
InfofCtx(ctx, KeyQuery, "Indexer error creating index - waiting for server background retry. Error:%v", err)
174-
// Wait for bucket to be created in background before returning
175-
return waitForIndexExistence(ctx, store, indexName, true)
176-
}
177-
178-
if IsCreateDuplicateIndexError(err) {
179-
InfofCtx(ctx, KeyQuery, "Duplicate index creation in progress - waiting for index readiness. Error:%v", err)
180-
// Wait for bucket to be created in background before returning
181-
return waitForIndexExistence(ctx, store, indexName, true)
205+
if IsIndexerRetryIndexError(err) || IsCreateDuplicateIndexError(err) {
206+
DebugfCtx(ctx, KeyQuery, "Index %q is already being created on server: %v", indexName, err)
207+
return fmt.Errorf("%w: %s", ErrIndexBackgroundRetry, err.Error())
182208
}
183209

184210
return pkgerrors.WithStack(RedactErrorf("Error creating index with statement: %s. Error: %v", UD(createStatement), err))
@@ -211,56 +237,35 @@ func waitForIndexExistence(ctx context.Context, store N1QLStore, indexName strin
211237
return nil
212238
}
213239

214-
// BuildDeferredIndexes issues a build command for any deferred sync gateway indexes associated with the bucket.
240+
// BuildDeferredIndexes issues a build command for any deferred sync gateway indexes associated with the N1QLStore keyspace.
215241
func BuildDeferredIndexes(ctx context.Context, s N1QLStore, indexSet []string) error {
216-
217242
if len(indexSet) == 0 {
218243
return nil
219244
}
220245

221-
// Only build indexes that are in deferred state. Query system:indexes to validate the provided set of indexes
222-
statement := fmt.Sprintf("SELECT indexes.name, indexes.state FROM system:indexes WHERE indexes.keyspace_id = '%s'", s.IndexMetaKeyspaceID())
246+
InfofCtx(ctx, KeyQuery, "Building deferred indexes: %v", indexSet)
223247

224-
if s.IndexMetaBucketID() != "" {
225-
statement += fmt.Sprintf("AND indexes.bucket_id = '%s' ", s.IndexMetaBucketID())
226-
}
227-
if s.IndexMetaScopeID() != "" {
228-
statement += fmt.Sprintf("AND indexes.scope_id = '%s' ", s.IndexMetaScopeID())
248+
// the provided indexes can be in a state that is not yet ready to take a build command
249+
// there's a delay between the time of index creation and when it's actually found in the system:indexes table
250+
// this results in buildIndexes returning a not found error for an index that was very recently created
251+
worker := func() (shouldRetry bool, err error, value interface{}) {
252+
err = buildIndexes(ctx, s, indexSet)
253+
if IsIndexNotFoundError(err) {
254+
DebugfCtx(ctx, KeyQuery, "Index not found error when building indexes - will retry: %v", err)
255+
return true, err, nil
256+
}
257+
return err != nil, err, nil
229258
}
230-
231-
statement += fmt.Sprintf("AND indexes.name IN [%s]", StringSliceToN1QLArray(indexSet, "'"))
232-
// mod: bucket name
233-
234-
results, err := s.executeQuery(statement)
259+
sleeper := CreateDoublingSleeperDurationFunc(500, time.Second*30)
260+
err, _ := RetryLoop(ctx, "BuildDeferredIndexes", worker, sleeper)
235261
if err != nil {
236262
return err
237263
}
238-
deferredIndexes := make([]string, 0)
239-
var indexInfo struct {
240-
Name string `json:"name"`
241-
State string `json:"state"`
242-
}
243-
for results.Next(ctx, &indexInfo) {
244-
// If index is deferred (not built), add to set of deferred indexes
245-
if indexInfo.State == IndexStateDeferred {
246-
deferredIndexes = append(deferredIndexes, indexInfo.Name)
247-
}
248-
}
249-
closeErr := results.Close()
250-
if closeErr != nil {
251-
return closeErr
252-
}
253-
254-
if len(deferredIndexes) == 0 {
255-
return nil
256-
}
257264

258-
InfofCtx(ctx, KeyQuery, "Building deferred indexes: %v", deferredIndexes)
259-
buildErr := buildIndexes(ctx, s, deferredIndexes)
260-
return buildErr
265+
return nil
261266
}
262267

263-
// BuildIndexes executes a BUILD INDEX statement in the current bucket, using the form:
268+
// BuildIndexes executes a BUILD INDEX statement in the N1QLStore keyspace, using the form:
264269
//
265270
// BUILD INDEX ON `bucket.Name`(`index1`, `index2`, ...)
266271
func buildIndexes(ctx context.Context, s N1QLStore, indexNames []string) error {
@@ -351,7 +356,7 @@ func getIndexMetaWithoutRetry(ctx context.Context, store N1QLStore, indexName st
351356
return true, indexInfo, nil
352357
}
353358

354-
// DropIndex drops the specified index from the current bucket.
359+
// DropIndex drops the specified index from the N1QLStore keyspace.
355360
func DropIndex(ctx context.Context, store N1QLStore, indexName string) error {
356361
statement := fmt.Sprintf("DROP INDEX default:%s.`%s`", store.EscapedKeyspace(), indexName)
357362

@@ -369,7 +374,7 @@ func DropIndex(ctx context.Context, store N1QLStore, indexName string) error {
369374
return err
370375
}
371376

372-
// AsN1QLStore tries to return the given DataStore as a N1QLStore, based on underlying buckets.
377+
// AsN1QLStore tries to return the given DataStore as a N1QLStore.
373378
func AsN1QLStore(dataStore DataStore) (N1QLStore, bool) {
374379

375380
switch typedDataStore := dataStore.(type) {
@@ -378,7 +383,7 @@ func AsN1QLStore(dataStore DataStore) (N1QLStore, bool) {
378383
case *LeakyDataStore:
379384
return typedDataStore, true
380385
default:
381-
// bail out for unrecognised/unsupported buckets
386+
// bail out for unrecognised/unsupported data store types
382387
return nil, false
383388
}
384389
}
@@ -389,6 +394,9 @@ func AsN1QLStore(dataStore DataStore) (N1QLStore, bool) {
389394
//
390395
// Stuck with doing a string compare to differentiate between 'not found' and other errors
391396
func IsIndexNotFoundError(err error) bool {
397+
if err == nil {
398+
return false
399+
}
392400
return strings.Contains(err.Error(), "not found")
393401
}
394402

@@ -425,6 +433,13 @@ func IsIndexerRetryBuildError(err error) bool {
425433
return false
426434
}
427435

436+
func IsIndexAlreadyExistsError(err error) bool {
437+
if err == nil {
438+
return false
439+
}
440+
return strings.Contains(err.Error(), "already exists")
441+
}
442+
428443
// Check for transient indexer errors (can be retried)
429444
func isTransientIndexerError(err error) bool {
430445
if err == nil {
@@ -594,7 +609,7 @@ func WaitForIndexesOnline(ctx context.Context, keyspace string, mgr *indexManage
594609
if watchedOnlineIndexCount == len(indexNames) {
595610
return false, nil, nil
596611
}
597-
InfofCtx(ctx, KeyAll, "Indexes %s not ready - retrying...", strings.Join(offlineIndexes, ", "))
612+
DebugfCtx(ctx, KeyAll, "Indexes %s not ready - retrying...", strings.Join(offlineIndexes, ", "))
598613
return true, nil, nil
599614
}, retrySleeper)
600615
return err

base/leaky_datastore.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,14 @@ func (lds *LeakyDataStore) CreateIndex(ctx context.Context, indexName string, ex
394394
return n1qlStore.CreateIndex(ctx, indexName, expression, filterExpression, options)
395395
}
396396

397+
func (lds *LeakyDataStore) CreateIndexIfNotExists(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
398+
n1qlStore, err := lds.getN1QLStore()
399+
if err != nil {
400+
return err
401+
}
402+
return n1qlStore.CreateIndexIfNotExists(ctx, indexName, expression, filterExpression, options)
403+
}
404+
397405
func (lds *LeakyDataStore) CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error {
398406
n1qlStore, err := lds.getN1QLStore()
399407
if err != nil {

0 commit comments

Comments
 (0)