|
16 | 16 |
|
17 | 17 | package com.couchbase.client.java.manager.query; |
18 | 18 |
|
| 19 | +import com.couchbase.client.core.Reactor; |
19 | 20 | import com.couchbase.client.core.annotation.Stability; |
20 | 21 | import com.couchbase.client.core.cnc.RequestSpan; |
21 | 22 | import com.couchbase.client.core.cnc.TracingIdentifiers; |
@@ -409,33 +410,38 @@ public CompletableFuture<Void> buildDeferredIndexes(final String bucketName, fin |
409 | 410 | notNull(options, "Options"); |
410 | 411 | final BuildQueryIndexOptions.Built builtOpts = options.build(); |
411 | 412 |
|
412 | | - String statement; |
413 | | - JsonArray parameters; |
414 | | - if (builtOpts.collectionName().isPresent() && builtOpts.scopeName().isPresent()) { |
415 | | - String keyspace = buildKeyspace(bucketName, builtOpts.scopeName(), builtOpts.collectionName()); |
416 | | - |
417 | | - statement = "BUILD INDEX ON " + keyspace + " (" + |
418 | | - " (" + |
419 | | - " SELECT RAW name FROM system:indexes " + |
420 | | - " WHERE bucket_id = ?" + |
421 | | - " AND scope_id = ?" + |
422 | | - " AND keyspace_id = ?" + |
423 | | - " AND state = \"deferred\"" + |
424 | | - " )" + |
425 | | - ")"; |
426 | | - parameters = JsonArray.from(bucketName, builtOpts.scopeName().get(), builtOpts.collectionName().get()); |
427 | | - } else { |
428 | | - statement = "BUILD INDEX ON `" + bucketName + "` (" + |
429 | | - " (" + |
430 | | - " SELECT RAW name FROM system:indexes " + |
431 | | - " WHERE keyspace_id = ? AND bucket_id IS MISSING AND state = \"deferred\"" + |
432 | | - " )" + |
433 | | - ")"; |
434 | | - parameters = JsonArray.from(bucketName); |
435 | | - } |
436 | 413 |
|
437 | | - return exec(WRITE, statement, builtOpts, TracingIdentifiers.SPAN_REQUEST_MQ_BUILD_DEFERRED_INDEXES, bucketName, parameters) |
438 | | - .thenApply(result -> null); |
| 414 | + GetAllQueryIndexesOptions getAllOptions = getAllQueryIndexesOptions(); |
| 415 | + builtOpts.collectionName().ifPresent(getAllOptions::collectionName); |
| 416 | + builtOpts.scopeName().ifPresent(getAllOptions::scopeName); |
| 417 | + builtOpts.timeout().ifPresent(getAllOptions::timeout); |
| 418 | + |
| 419 | + return Reactor |
| 420 | + .toMono(() -> getAllIndexes(bucketName, getAllOptions)) |
| 421 | + .map(indexes -> indexes |
| 422 | + .stream() |
| 423 | + .filter(idx -> idx.state().equals("deferred")) |
| 424 | + .map(idx -> quote(idx.name())) |
| 425 | + .collect(Collectors.toList()) |
| 426 | + ) |
| 427 | + .flatMap(indexNames -> { |
| 428 | + if (indexNames.isEmpty()) { |
| 429 | + return Mono.empty(); |
| 430 | + } |
| 431 | + |
| 432 | + String keyspace = builtOpts.collectionName().isPresent() && builtOpts.scopeName().isPresent() |
| 433 | + ? buildKeyspace(bucketName, builtOpts.scopeName(), builtOpts.collectionName()) |
| 434 | + : quote(bucketName); |
| 435 | + |
| 436 | + String statement = "BUILD INDEX ON " + keyspace + " (" + String.join(",", indexNames) + ")"; |
| 437 | + |
| 438 | + return Reactor.toMono( |
| 439 | + () -> exec(WRITE, statement, builtOpts, TracingIdentifiers.SPAN_REQUEST_MQ_BUILD_DEFERRED_INDEXES, bucketName, null) |
| 440 | + .thenApply(result -> null) |
| 441 | + ); |
| 442 | + }) |
| 443 | + .then() |
| 444 | + .toFuture(); |
439 | 445 | } |
440 | 446 |
|
441 | 447 | /** |
|
0 commit comments