Skip to content

Commit 7c14d0b

Browse files
committed
JVMCBC-1458: Spark reports java.io.NotSerializableException: com.couchbase.client.core.api.kv.CoreSubdocGetResult
Any class used by the Spark Connector needs to implement Serializable, so it can be serialized and sent between worker nodes. Optional, unfortunately, is not Serializable. Change-Id: I998a0d169810bc4a597839de0b54b69c5f109100 Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/203205 Reviewed-by: David Nault <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 6892622 commit 7c14d0b

File tree

11 files changed

+96
-33
lines changed

11 files changed

+96
-33
lines changed

core-io/src/main/java/com/couchbase/client/core/CoreKeyspace.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
import com.couchbase.client.core.annotation.Stability;
2020
import com.couchbase.client.core.io.CollectionIdentifier;
2121

22+
import java.io.Serializable;
2223
import java.util.Objects;
2324
import java.util.Optional;
2425

2526
import static java.util.Objects.requireNonNull;
2627

2728
@Stability.Internal
28-
public final class CoreKeyspace {
29+
public final class CoreKeyspace implements Serializable {
2930
private final String bucket;
3031
private final String scope;
3132
private final String collection;

core-io/src/main/java/com/couchbase/client/core/api/kv/CoreKvResponseMetadata.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
import com.couchbase.client.core.io.netty.kv.MemcacheProtocol;
2121
import reactor.util.annotation.Nullable;
2222

23+
import java.io.Serializable;
2324
import java.util.OptionalInt;
2425
import java.util.OptionalLong;
2526

26-
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
2727
@Stability.Internal
28-
public final class CoreKvResponseMetadata {
29-
private final OptionalInt readUnits;
30-
private final OptionalInt writeUnits;
31-
private final OptionalLong serverDuration;
28+
public final class CoreKvResponseMetadata implements Serializable {
29+
private final @Nullable Integer readUnits;
30+
private final @Nullable Integer writeUnits;
31+
private final @Nullable Long serverDuration;
3232

3333
public static final CoreKvResponseMetadata NONE = new CoreKvResponseMetadata(-1, -1, -1);
3434

@@ -45,20 +45,20 @@ public static CoreKvResponseMetadata of(int readUnits, int writeUnits, long serv
4545
}
4646

4747
private CoreKvResponseMetadata(int readUnits, int writeUnits, long serverDuration) {
48-
this.readUnits = readUnits < 0 ? OptionalInt.empty() : OptionalInt.of(readUnits);
49-
this.writeUnits = writeUnits < 0 ? OptionalInt.empty() : OptionalInt.of(writeUnits);
50-
this.serverDuration = serverDuration < 0 ? OptionalLong.empty() : OptionalLong.of(serverDuration);
48+
this.readUnits = readUnits < 0 ? null : readUnits;
49+
this.writeUnits = writeUnits < 0 ? null : writeUnits;
50+
this.serverDuration = serverDuration < 0 ? null : serverDuration;
5151
}
5252

53-
public OptionalInt readUnits() {
53+
public @Nullable Integer readUnits() {
5454
return readUnits;
5555
}
5656

57-
public OptionalInt writeUnits() {
57+
public @Nullable Integer writeUnits() {
5858
return writeUnits;
5959
}
6060

61-
public OptionalLong serverDuration() {
61+
public @Nullable Long serverDuration() {
6262
return serverDuration;
6363
}
6464

core-io/src/main/java/com/couchbase/client/core/api/kv/CoreKvResult.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import com.couchbase.client.core.annotation.Stability;
2121
import reactor.util.annotation.Nullable;
2222

23+
import java.io.Serializable;
24+
2325
import static java.util.Objects.requireNonNull;
2426

2527
@Stability.Internal
26-
public abstract class CoreKvResult {
28+
public abstract class CoreKvResult implements Serializable {
2729
private final CoreKeyspace keyspace;
2830
private final String key;
2931
private final CoreKvResponseMetadata meta;

core-io/src/main/java/com/couchbase/client/core/io/CollectionIdentifier.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package com.couchbase.client.core.io;
1818

19+
import reactor.util.annotation.Nullable;
20+
21+
import java.io.Serializable;
1922
import java.util.HashMap;
2023
import java.util.Map;
2124
import java.util.Objects;
@@ -29,15 +32,15 @@
2932
*
3033
* @since 2.0.0
3134
*/
32-
public class CollectionIdentifier {
35+
public class CollectionIdentifier implements Serializable {
3336

3437
public static final String DEFAULT_SCOPE = "_default";
3538
public static final String DEFAULT_COLLECTION = "_default";
3639

3740
private final String bucket;
3841
private final boolean isDefault;
39-
private final Optional<String> scope;
40-
private final Optional<String> collection;
42+
private final @Nullable String scope;
43+
private final @Nullable String collection;
4144

4245
public static CollectionIdentifier fromDefault(String bucket) {
4346
return new CollectionIdentifier(bucket, Optional.of(DEFAULT_SCOPE), Optional.of(DEFAULT_COLLECTION));
@@ -49,8 +52,8 @@ public CollectionIdentifier(String bucket, Optional<String> scope, Optional<Stri
4952
requireNonNull(collection);
5053

5154
this.bucket = bucket;
52-
this.scope = scope;
53-
this.collection = collection;
55+
this.scope = scope.orElse(null);
56+
this.collection = collection.orElse(null);
5457
this.isDefault = Optional.of(DEFAULT_SCOPE).equals(scope) && Optional.of(DEFAULT_COLLECTION).equals(collection);
5558
}
5659

@@ -59,11 +62,11 @@ public String bucket() {
5962
}
6063

6164
public Optional<String> scope() {
62-
return scope;
65+
return Optional.ofNullable(scope);
6366
}
6467

6568
public Optional<String> collection() {
66-
return collection;
69+
return Optional.ofNullable(collection);
6770
}
6871

6972
public boolean isDefault() {

java-client/HOWTO-ABSENT-VALUES.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ Prefer @Nullable, unless Optional offers a clear advantage.
5353

5454
For example, if a field is used exclusively by an accessor that returns Optional, it's reasonable for the field to be an Optional, so the accessor doesn't need to create a new Optional on every call.
5555

56+
Note that any class ultimately accessed by the Couchbase Spark Connector needs to implement Serializable, and Optional is not Serializable.
57+
This will apply to any class in core-io that is ultimately in a Scala SDK object being returned to the user.
58+
59+
References:
60+
61+
* https://issues.couchbase.com/browse/JVMCBC-1458[JVMCBC-1458]
62+
* https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc/24564612#24564612[StackOverflow post]
63+
5664
== Constructors
5765

5866
Constructors should verify that parameters not annotated as @Nullable are actually non-null.

scala-fit-performer/src/main/scala/com/couchbase/client/performer/scala/ScalaSdkCommandExecutor.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.couchbase.client.performer.core.util.ErrorUtil
2525
import com.couchbase.client.performer.core.util.TimeUtil.getTimeNow
2626
import com.couchbase.client.performer.scala.Content.{ContentByteArray, ContentJson, ContentNull, ContentString}
2727
import com.couchbase.client.performer.scala.ScalaSdkCommandExecutor._
28+
import com.couchbase.client.performer.scala.util.SerializableValidation.assertIsSerializable
2829
// [start:1.5.0]
2930
import com.couchbase.client.performer.scala.kv.{GetReplicaHelper, MutateInHelper}
3031
// [end:1.5.0]
@@ -518,6 +519,7 @@ object ScalaSdkCommandExecutor {
518519

519520
// [start:1.5.0]
520521
def processScanResult(request: Scan, r: ScanResult): com.couchbase.client.protocol.run.Result = {
522+
assertIsSerializable(r)
521523
val builder = com.couchbase.client.protocol.sdk.kv.rangescan.ScanResult.newBuilder
522524
.setId(r.id)
523525
.setIdOnly(r.idOnly)
@@ -646,6 +648,7 @@ object ScalaSdkCommandExecutor {
646648
case Right(expiry) => out.expiry(expiry)
647649
}
648650
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
651+
assertIsSerializable(out)
649652
out
650653
} else null
651654
}
@@ -658,6 +661,7 @@ object ScalaSdkCommandExecutor {
658661
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
659662
if (opts.hasDurability) out = out.durability(convertDurability(opts.getDurability))
660663
if (opts.hasCas) out = out.cas(opts.getCas)
664+
assertIsSerializable(out)
661665
out
662666
} else null
663667
}
@@ -672,6 +676,7 @@ object ScalaSdkCommandExecutor {
672676
if (opts.getProjectionCount > 0)
673677
out = out.project(opts.getProjectionList.asByteStringList().toSeq.map(v => v.toStringUtf8))
674678
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
679+
assertIsSerializable(out)
675680
out
676681
} else null
677682
}
@@ -712,6 +717,7 @@ object ScalaSdkCommandExecutor {
712717
// [start:1.1.5]
713718
if (opts.hasPreserveExpiry) out = out.preserveExpiry(opts.getPreserveExpiry)
714719
// [end:1.1.5]
720+
assertIsSerializable(out)
715721
out
716722
} else null
717723
}
@@ -751,6 +757,7 @@ object ScalaSdkCommandExecutor {
751757
// [start:1.1.5]
752758
if (opts.hasPreserveExpiry) out = out.preserveExpiry(opts.getPreserveExpiry)
753759
// [end:1.1.5]
760+
assertIsSerializable(out)
754761
out
755762
} else null
756763
}
@@ -772,6 +779,7 @@ object ScalaSdkCommandExecutor {
772779
if (opts.hasBatchTimeLimit) throw new UnsupportedOperationException("Cannot support batch time limit");
773780
// Will add when adding support for Caps.OBSERVABILITY_1.
774781
// if (opts.hasParentSpanId) out = out.parentSpan(spans.get(opts.getParentSpanId))
782+
assertIsSerializable(out)
775783
out
776784
} else null
777785
}
@@ -785,6 +793,7 @@ object ScalaSdkCommandExecutor {
785793
if (opts.hasTimeoutMsecs)
786794
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
787795
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
796+
assertIsSerializable(out)
788797
out
789798
} else null
790799
}
@@ -795,6 +804,7 @@ object ScalaSdkCommandExecutor {
795804
var out = UnlockOptions()
796805
if (opts.hasTimeoutMsecs)
797806
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
807+
assertIsSerializable(out)
798808
out
799809
} else null
800810
}
@@ -805,6 +815,7 @@ object ScalaSdkCommandExecutor {
805815
var out = ExistsOptions()
806816
if (opts.hasTimeoutMsecs)
807817
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
818+
assertIsSerializable(out)
808819
out
809820
} else null
810821
}
@@ -815,6 +826,7 @@ object ScalaSdkCommandExecutor {
815826
var out = TouchOptions()
816827
if (opts.hasTimeoutMsecs)
817828
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
829+
assertIsSerializable(out)
818830
out
819831
} else null
820832
}
@@ -826,6 +838,7 @@ object ScalaSdkCommandExecutor {
826838
if (opts.hasTimeoutMsecs)
827839
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
828840
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
841+
assertIsSerializable(out)
829842
out
830843
} else null
831844
}
@@ -837,6 +850,7 @@ object ScalaSdkCommandExecutor {
837850
if (opts.hasTimeoutMsecs)
838851
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
839852
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
853+
assertIsSerializable(out)
840854
out
841855
} else null
842856
}
@@ -848,6 +862,7 @@ object ScalaSdkCommandExecutor {
848862
if (opts.hasTimeoutMsecs)
849863
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
850864
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
865+
assertIsSerializable(out)
851866
out
852867
} else null
853868
}
@@ -864,6 +879,7 @@ object ScalaSdkCommandExecutor {
864879
case Left(expiry) => out.expiry(expiry)
865880
case Right(expiry) => out.expiry(expiry)
866881
}
882+
assertIsSerializable(out)
867883
Some(out)
868884
} else None
869885
}
@@ -880,6 +896,7 @@ object ScalaSdkCommandExecutor {
880896
case Left(expiry) => out.expiry(expiry)
881897
case Right(expiry) => out.expiry(expiry)
882898
}
899+
assertIsSerializable(out)
883900
Some(out)
884901
} else None
885902
}
@@ -892,6 +909,7 @@ object ScalaSdkCommandExecutor {
892909
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
893910
if (opts.hasCas) out = out.cas(opts.getCas)
894911
if (opts.hasDurability) out = out.durability(convertDurability(opts.getDurability))
912+
assertIsSerializable(out)
895913
Some(out)
896914
} else None
897915
}
@@ -904,6 +922,7 @@ object ScalaSdkCommandExecutor {
904922
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
905923
if (opts.hasCas) out = out.cas(opts.getCas)
906924
if (opts.hasDurability) out = out.durability(convertDurability(opts.getDurability))
925+
assertIsSerializable(out)
907926
Some(out)
908927
} else None
909928
}
@@ -926,6 +945,7 @@ object ScalaSdkCommandExecutor {
926945
result: com.couchbase.client.protocol.run.Result.Builder,
927946
value: MutationResult
928947
): Unit = {
948+
assertIsSerializable(result)
929949
val builder = com.couchbase.client.protocol.sdk.kv.MutationResult.newBuilder.setCas(value.cas)
930950
value.mutationToken.foreach(
931951
mt =>
@@ -945,6 +965,7 @@ object ScalaSdkCommandExecutor {
945965
result: com.couchbase.client.protocol.run.Result.Builder,
946966
value: GetResult
947967
): Unit = {
968+
assertIsSerializable(result)
948969
val builder = com.couchbase.client.protocol.sdk.kv.GetResult.newBuilder
949970
.setCas(value.cas)
950971

@@ -973,6 +994,7 @@ object ScalaSdkCommandExecutor {
973994
result: com.couchbase.client.protocol.run.Result.Builder,
974995
value: ExistsResult
975996
): Unit = {
997+
assertIsSerializable(value)
976998
result.setSdk(com.couchbase.client.protocol.sdk.Result.newBuilder
977999
.setExistsResult(com.couchbase.client.protocol.sdk.kv.ExistsResult.newBuilder
9781000
.setCas(value.cas)
@@ -983,6 +1005,7 @@ object ScalaSdkCommandExecutor {
9831005
result: com.couchbase.client.protocol.run.Result.Builder,
9841006
value: CounterResult
9851007
): Unit = {
1008+
assertIsSerializable(value)
9861009
val builder = com.couchbase.client.protocol.sdk.kv.CounterResult.newBuilder
9871010
.setCas(value.cas)
9881011
.setContent(value.content)
@@ -1021,6 +1044,7 @@ object ScalaSdkCommandExecutor {
10211044
}
10221045

10231046
def convertException(raw: Throwable): com.couchbase.client.protocol.shared.Exception = {
1047+
assertIsSerializable(raw)
10241048
val ret = com.couchbase.client.protocol.shared.Exception.newBuilder
10251049

10261050
if (raw.isInstanceOf[CouchbaseException] || raw.isInstanceOf[UnsupportedOperationException]) {
@@ -1068,7 +1092,7 @@ object ScalaSdkCommandExecutor {
10681092
}
10691093
options = options.serviceTypes(services)
10701094
}
1071-
1095+
assertIsSerializable(options)
10721096
options
10731097
}
10741098
}

scala-fit-performer/src/main/scala/com/couchbase/client/performer/scala/kv/GetReplicaHelper.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package com.couchbase.client.performer.scala.kv
2020
import com.couchbase.client.performer.core.perf.PerRun
2121
import com.couchbase.client.performer.core.util.TimeUtil.getTimeNow
2222
import com.couchbase.client.performer.scala.ScalaSdkCommandExecutor.{convertException, convertTranscoder, setSuccess}
23+
import com.couchbase.client.performer.scala.util.SerializableValidation.assertIsSerializable
2324
import com.couchbase.client.performer.scala.util.{ClusterConnection, ContentAsUtil, ScalaFluxStreamer, ScalaIteratorStreamer}
2425
import com.couchbase.client.protocol.run.Result
2526
import com.couchbase.client.protocol.sdk.{CollectionLevelCommand, Command}
@@ -250,6 +251,7 @@ object GetReplicaHelper {
250251
if (opts.hasTimeoutMsecs)
251252
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
252253
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
254+
assertIsSerializable(out)
253255
Some(out)
254256
} else None
255257
}
@@ -263,6 +265,7 @@ object GetReplicaHelper {
263265
if (opts.hasTimeoutMsecs)
264266
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
265267
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
268+
assertIsSerializable(out)
266269
Some(out)
267270
} else None
268271
}
@@ -272,6 +275,7 @@ object GetReplicaHelper {
272275
value: GetReplicaResult,
273276
streamId: Option[String] = None
274277
): com.couchbase.client.protocol.sdk.kv.GetReplicaResult = {
278+
assertIsSerializable(value)
275279
val builder = com.couchbase.client.protocol.sdk.kv.GetReplicaResult.newBuilder
276280
.setCas(value.cas)
277281

0 commit comments

Comments
 (0)