Skip to content

Commit 4536fff

Browse files
committed
SCBC-476: Implement ExtGetMulti
Change-Id: I96930a54ee88a03d454c73cc4eca66d4fd8dd9d5 Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/231703 Tested-by: Build Bot <[email protected]> Reviewed-by: David Nault <[email protected]>
1 parent 43dd8f0 commit 4536fff

15 files changed

+647
-4
lines changed

scala-client/src/main/scala-2/com/couchbase/client/scala/transactions/ReactiveTransactionAttemptContext.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ import com.couchbase.client.scala.transactions.internal.EncodingUtil.encode
3737
import com.couchbase.client.scala.util.FutureConversions
3838
import com.couchbase.client.scala.{ReactiveCollection, ReactiveScope}
3939
import reactor.core.scala.publisher.SMono
40+
import com.couchbase.client.scala.transactions.getmulti.{
41+
TransactionGetMultiSpec,
42+
TransactionGetMultiOptions,
43+
TransactionGetMultiResult,
44+
TransactionGetMultiReplicasFromPreferredServerGroupSpec,
45+
TransactionGetMultiReplicasFromPreferredServerGroupOptions,
46+
TransactionGetMultiReplicasFromPreferredServerGroupResult,
47+
TransactionGetMultiUtil
48+
}
4049

4150
import scala.util.{Failure, Success};
4251

@@ -132,6 +141,38 @@ class ReactiveTransactionAttemptContext private[scala] (
132141
)
133142
.map(result => TransactionGetResult(result, options.transcoder))
134143

144+
/** Fetches multiple documents in a single operation.
145+
*
146+
* In addition, it will heuristically aim to detect read skew anomalies, and avoid them if possible. Read skew detection and avoidance is not guaranteed.
147+
*
148+
* @param specs the documents to fetch.
149+
* @return a result containing the fetched documents.
150+
*/
151+
def getMulti(
152+
specs: Seq[TransactionGetMultiSpec],
153+
options: TransactionGetMultiOptions = TransactionGetMultiOptions()
154+
): SMono[TransactionGetMultiResult] = {
155+
val coreSpecs = TransactionGetMultiUtil.convert(specs)
156+
FutureConversions
157+
.javaMonoToScalaMono(internal.getMultiAlgo(coreSpecs, options.toCore, false))
158+
.map(res => TransactionGetMultiUtil.convert(res, specs))
159+
}
160+
161+
/** Similar to [[getMulti]], but fetches the documents from replicas in the preferred server group.
162+
*
163+
* Note that the nature of replicas is that they are eventually consistent with the active, and so the effectiveness of read skew detection may be impacted.
164+
*/
165+
def getMultiReplicasFromPreferredServerGroup(
166+
specs: Seq[TransactionGetMultiReplicasFromPreferredServerGroupSpec],
167+
options: TransactionGetMultiReplicasFromPreferredServerGroupOptions =
168+
TransactionGetMultiReplicasFromPreferredServerGroupOptions()
169+
): SMono[TransactionGetMultiReplicasFromPreferredServerGroupResult] = {
170+
val coreSpecs = TransactionGetMultiUtil.convertReplica(specs)
171+
FutureConversions
172+
.javaMonoToScalaMono(internal.getMultiAlgo(coreSpecs, options.toCore, true))
173+
.map(res => TransactionGetMultiUtil.convertReplica(res, specs))
174+
}
175+
135176
/** Inserts a new document into the specified Couchbase <code>collection</code>.
136177
*
137178
* @param collection the Couchbase collection in which to insert the doc

scala-client/src/main/scala/com/couchbase/client/scala/transactions/AsyncTransactionAttemptContext.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ import com.couchbase.client.scala.transactions.config.{
3434
TransactionInsertOptions,
3535
TransactionReplaceOptions
3636
}
37+
import com.couchbase.client.scala.transactions.getmulti.{
38+
TransactionGetMultiOptions,
39+
TransactionGetMultiReplicasFromPreferredServerGroupOptions,
40+
TransactionGetMultiReplicasFromPreferredServerGroupResult,
41+
TransactionGetMultiReplicasFromPreferredServerGroupSpec,
42+
TransactionGetMultiResult,
43+
TransactionGetMultiSpec,
44+
TransactionGetMultiUtil
45+
}
3746
import com.couchbase.client.scala.transactions.internal.EncodingUtil.encode
3847
import com.couchbase.client.scala.util.FutureConversions
3948
import com.couchbase.client.scala.{AsyncCollection, AsyncScope}
@@ -100,6 +109,42 @@ class AsyncTransactionAttemptContext private[scala] (
100109
)
101110
.map(result => TransactionGetResult(result, options.transcoder))
102111

112+
/** Fetches multiple documents in a single operation.
113+
*
114+
* In addition, it will heuristically aim to detect read skew anomalies, and avoid them if possible. Read skew detection and avoidance is not guaranteed.
115+
*
116+
* @param specs the documents to fetch.
117+
* @return a result containing the fetched documents.
118+
*/
119+
def getMulti(
120+
specs: Seq[TransactionGetMultiSpec],
121+
options: TransactionGetMultiOptions = TransactionGetMultiOptions.Default
122+
): Future[TransactionGetMultiResult] = {
123+
val coreSpecs = TransactionGetMultiUtil.convert(specs)
124+
FutureConversions
125+
.javaMonoToScalaFuture(
126+
internal.getMultiAlgo(coreSpecs, options.toCore, false)
127+
)
128+
.map(res => TransactionGetMultiUtil.convert(res, specs))
129+
}
130+
131+
/** Similar to [[getMulti]], but fetches the documents from replicas in the preferred server group.
132+
*
133+
* Note that the nature of replicas is that they are eventually consistent with the active, and so the effectiveness of read skew detection may be impacted.
134+
*/
135+
def getMultiReplicasFromPreferredServerGroup(
136+
specs: Seq[TransactionGetMultiReplicasFromPreferredServerGroupSpec],
137+
options: TransactionGetMultiReplicasFromPreferredServerGroupOptions =
138+
TransactionGetMultiReplicasFromPreferredServerGroupOptions.Default
139+
): Future[TransactionGetMultiReplicasFromPreferredServerGroupResult] = {
140+
val coreSpecs = TransactionGetMultiUtil.convertReplica(specs)
141+
FutureConversions
142+
.javaMonoToScalaFuture(
143+
internal.getMultiAlgo(coreSpecs, options.toCore, true)
144+
)
145+
.map(res => TransactionGetMultiUtil.convertReplica(res, specs))
146+
}
147+
103148
/** Inserts a new document into the specified Couchbase <code>collection</code>.
104149
*
105150
* @param collection the Couchbase collection in which to insert the doc

scala-client/src/main/scala/com/couchbase/client/scala/transactions/TransactionAttemptContext.scala

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,27 @@
1515
*/
1616
package com.couchbase.client.scala.transactions
1717

18-
import com.couchbase.client.core.cnc.TracingIdentifiers.{TRANSACTION_OP_INSERT, TRANSACTION_OP_REMOVE, TRANSACTION_OP_REPLACE}
1918
import com.couchbase.client.core.cnc.{CbTracing, RequestSpan, TracingIdentifiers}
2019
import com.couchbase.client.core.error.CouchbaseException
2120
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext
2221
import com.couchbase.client.core.transaction.log.CoreTransactionLogger
2322
import com.couchbase.client.core.transaction.support.SpanWrapper
2423
import com.couchbase.client.scala.codec.JsonSerializer
25-
import com.couchbase.client.scala.transactions.config.{TransactionGetOptions, TransactionGetReplicaFromPreferredServerGroupOptions, TransactionInsertOptions, TransactionReplaceOptions}
24+
import com.couchbase.client.scala.transactions.getmulti.{
25+
TransactionGetMultiSpec,
26+
TransactionGetMultiOptions,
27+
TransactionGetMultiResult,
28+
TransactionGetMultiReplicasFromPreferredServerGroupSpec,
29+
TransactionGetMultiReplicasFromPreferredServerGroupOptions,
30+
TransactionGetMultiReplicasFromPreferredServerGroupResult,
31+
TransactionGetMultiUtil
32+
}
33+
import com.couchbase.client.scala.transactions.config.{
34+
TransactionGetOptions,
35+
TransactionGetReplicaFromPreferredServerGroupOptions,
36+
TransactionInsertOptions,
37+
TransactionReplaceOptions
38+
}
2639
import com.couchbase.client.scala.transactions.internal.EncodingUtil.encode
2740
import com.couchbase.client.scala.util.{AsyncUtils, FutureConversions}
2841
import com.couchbase.client.scala.{Collection, Scope}
@@ -83,6 +96,32 @@ class TransactionAttemptContext private[scala] (
8396
AsyncUtils.block(internal.getReplicaFromPreferredServerGroup(collection.async, id, options))
8497
}
8598

99+
/** Fetches multiple documents in a single operation.
100+
*
101+
* In addition, it will heuristically aim to detect read skew anomalies, and avoid them if possible. Read skew detection and avoidance is not guaranteed.
102+
*
103+
* @param specs the documents to fetch.
104+
* @return a result containing the fetched documents.
105+
*/
106+
def getMulti(
107+
specs: Seq[TransactionGetMultiSpec],
108+
options: TransactionGetMultiOptions = TransactionGetMultiOptions.Default
109+
): Try[TransactionGetMultiResult] = {
110+
AsyncUtils.block(internal.getMulti(specs, options))
111+
}
112+
113+
/** Similar to [[getMulti]], but fetches the documents from replicas in the preferred server group.
114+
*
115+
* Note that the nature of replicas is that they are eventually consistent with the active, and so the effectiveness of read skew detection may be impacted.
116+
*/
117+
def getMultiReplicasFromPreferredServerGroup(
118+
specs: Seq[TransactionGetMultiReplicasFromPreferredServerGroupSpec],
119+
options: TransactionGetMultiReplicasFromPreferredServerGroupOptions =
120+
TransactionGetMultiReplicasFromPreferredServerGroupOptions.Default
121+
): Try[TransactionGetMultiReplicasFromPreferredServerGroupResult] = {
122+
AsyncUtils.block(internal.getMultiReplicasFromPreferredServerGroup(specs, options))
123+
}
124+
86125
/** Mutates the specified <code>doc</code> with new content.
87126
* <p>
88127
* The mutation is staged until the transaction is committed. That is, any read of the document by any Couchbase
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2025 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.couchbase.client.scala.transactions.getmulti
17+
18+
import com.couchbase.client.core.annotation.Stability
19+
import com.couchbase.client.core.transaction.getmulti.CoreTransactionGetMultiMode
20+
21+
/** Controls how a getMulti operation behaves.
22+
*/
23+
@Stability.Uncommitted
24+
sealed trait TransactionGetMultiMode {
25+
private[getmulti] def toCore: CoreTransactionGetMultiMode
26+
}
27+
28+
object TransactionGetMultiMode {
29+
30+
/** Some time-bounded effort will be made to detect and avoid read skew. */
31+
case object PrioritiseLatency extends TransactionGetMultiMode {
32+
override private[getmulti] def toCore: CoreTransactionGetMultiMode =
33+
CoreTransactionGetMultiMode.PRIORITISE_LATENCY
34+
}
35+
36+
/** No read-skew detection will be attempted. Once the documents are fetched they will be returned immediately. */
37+
case object DisableReadSkewDetection extends TransactionGetMultiMode {
38+
override private[getmulti] def toCore: CoreTransactionGetMultiMode =
39+
CoreTransactionGetMultiMode.DISABLE_READ_SKEW_DETECTION
40+
}
41+
42+
/** Great effort will be made to detect and avoid read skew. */
43+
case object PrioritiseReadSkewDetection extends TransactionGetMultiMode {
44+
override private[getmulti] def toCore: CoreTransactionGetMultiMode =
45+
CoreTransactionGetMultiMode.PRIORITISE_READ_SKEW_DETECTION
46+
}
47+
48+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2025 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.couchbase.client.scala.transactions.getmulti
17+
18+
import com.couchbase.client.core.annotation.Stability
19+
import com.couchbase.client.core.transaction.getmulti.CoreGetMultiOptions
20+
21+
/** Customize how a getMulti() operation runs.
22+
*/
23+
@Stability.Uncommitted
24+
case class TransactionGetMultiOptions(
25+
mode: Option[TransactionGetMultiMode] = None
26+
) {
27+
28+
/** Controls how the operation behaves - see [[TransactionGetMultiMode]] for details.
29+
*
30+
* If not explicitly set, the default behaviour is intentionally unspecified, and may change in future versions.
31+
*/
32+
def mode(mode: TransactionGetMultiMode): TransactionGetMultiOptions =
33+
copy(mode = Some(mode))
34+
35+
/** Converts this options instance into the core representation. */
36+
private[transactions] def toCore: CoreGetMultiOptions =
37+
new CoreGetMultiOptions(mode.map(_.toCore).orNull)
38+
}
39+
40+
object TransactionGetMultiOptions {
41+
private[transactions] val Default: TransactionGetMultiOptions = TransactionGetMultiOptions()
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2025 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.couchbase.client.scala.transactions.getmulti
17+
18+
import com.couchbase.client.core.annotation.Stability
19+
import com.couchbase.client.core.transaction.getmulti.CoreTransactionGetMultiMode
20+
21+
/** Controls how a getMultiReplicasFromPreferredServerGroup operation behaves.
22+
*/
23+
@Stability.Uncommitted
24+
sealed trait TransactionGetMultiReplicasFromPreferredServerGroupMode {
25+
private[getmulti] def toCore: CoreTransactionGetMultiMode
26+
}
27+
28+
object TransactionGetMultiReplicasFromPreferredServerGroupMode {
29+
import TransactionGetMultiMode._
30+
31+
/** Some time-bounded effort will be made to detect and avoid read skew. */
32+
case object PrioritiseLatency extends TransactionGetMultiReplicasFromPreferredServerGroupMode {
33+
override private[getmulti] def toCore: CoreTransactionGetMultiMode =
34+
CoreTransactionGetMultiMode.PRIORITISE_LATENCY
35+
}
36+
37+
/** No read-skew detection will be attempted. */
38+
case object DisableReadSkewDetection
39+
extends TransactionGetMultiReplicasFromPreferredServerGroupMode {
40+
override private[getmulti] def toCore: CoreTransactionGetMultiMode =
41+
CoreTransactionGetMultiMode.DISABLE_READ_SKEW_DETECTION
42+
}
43+
44+
/** Great effort will be made to detect and avoid read skew. */
45+
case object PrioritiseReadSkewDetection
46+
extends TransactionGetMultiReplicasFromPreferredServerGroupMode {
47+
override private[getmulti] def toCore: CoreTransactionGetMultiMode =
48+
CoreTransactionGetMultiMode.PRIORITISE_READ_SKEW_DETECTION
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2025 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.couchbase.client.scala.transactions.getmulti
17+
18+
import com.couchbase.client.core.annotation.Stability
19+
import com.couchbase.client.core.transaction.getmulti.CoreGetMultiOptions
20+
21+
/** Customize how a getMultiReplicasFromPreferredServerGroup() operation runs.
22+
*/
23+
@Stability.Uncommitted
24+
case class TransactionGetMultiReplicasFromPreferredServerGroupOptions(
25+
mode: Option[TransactionGetMultiReplicasFromPreferredServerGroupMode] = None
26+
) {
27+
28+
/** Controls how the operation behaves - see [[TransactionGetMultiReplicasFromPreferredServerGroupMode]] for details.
29+
*
30+
* If not explicitly set, the default behaviour is intentionally unspecified, and may change in future versions.
31+
*/
32+
def mode(
33+
mode: TransactionGetMultiReplicasFromPreferredServerGroupMode
34+
): TransactionGetMultiReplicasFromPreferredServerGroupOptions =
35+
copy(mode = Some(mode))
36+
37+
/** Converts this options instance into the core representation. */
38+
private[transactions] def toCore: CoreGetMultiOptions =
39+
new CoreGetMultiOptions(mode.map(_.toCore).orNull)
40+
}
41+
42+
object TransactionGetMultiReplicasFromPreferredServerGroupOptions {
43+
private[transactions] val Default: TransactionGetMultiReplicasFromPreferredServerGroupOptions =
44+
TransactionGetMultiReplicasFromPreferredServerGroupOptions()
45+
}

0 commit comments

Comments
 (0)