Skip to content

PubSub: optionally raise error if batch overflows on first message #7107

@relud

Description

@relud

Is your feature request related to a problem? Please describe.

Calling google.cloud.pubsub.PublisherClient.publish with very large data causes exceptions in the PubSub emulator if PublishRequest.ByteSize exceeds the emulator's grpc.max_receive_message_length or the client's grpc.max_send_message_length.

Describe the solution you'd like

I want to optionally configure the client to strictly enforce BatchSettings.max_bytes by raising ValueError when a message overflows an empty batch:

diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
index f187024b7c..008ec21a78 100644
--- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
+++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
@@ -289,6 +289,9 @@ class Batch(base.Batch):
             )

             if not self._messages or not overflow:
+                if overflow and self.settings.strict:
+                    # Refuse to make batch exceed self.settings if strict
+                    raise ValueError("Message overflows empty batch")

                 # Store the actual message in the batch's message queue.
                 self._messages.append(message)
diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py
index c2662cf836..c7f5b1e5bd 100644
--- a/pubsub/google/cloud/pubsub_v1/types.py
+++ b/pubsub/google/cloud/pubsub_v1/types.py
@@ -36,12 +36,13 @@ from google.cloud.pubsub_v1.proto import pubsub_pb2
 # these settings can be altered to tweak Pub/Sub behavior.
 # The defaults should be fine for most use cases.
 BatchSettings = collections.namedtuple(
-    "BatchSettings", ["max_bytes", "max_latency", "max_messages"]
+    "BatchSettings", ["max_bytes", "max_latency", "max_messages", "strict"]
 )
 BatchSettings.__new__.__defaults__ = (
     1000 * 1000 * 10,  # max_bytes: documented "10 MB", enforced 10000000
     0.05,  # max_latency: 0.05 seconds
     1000,  # max_messages: 1,000
+    False,  # do not raise ValueError when a message overflows an empty batch
 )

 # Define the type class and default values for flow control settings.

Describe alternatives you've considered

I can enforce this outside the library, but it's inefficient because I can't pass a PubsubMessage to google.cloud.pubsub.PublisherClient.publish to reuse the caches created when I call PubsubMessage.ByteSize.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.triaged for GAtype: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions