Skip to content

Commit 46f091f

Browse files
authored
deprecate(pubsub/v2): remove and rename existing settings (#11375)
* deprecate(pubsub/v2): deprecate and rename user settings * remove more code related to synchronous and byte limit
1 parent 950d3f3 commit 46f091f

File tree

7 files changed

+53
-215
lines changed

7 files changed

+53
-215
lines changed

pubsub/loadtest/loadtest.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,9 @@ func (l *PubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartR
6666
func (l *PubServer) init(c *pubsub.Client, topicName string, msgSize, batchSize int32, batchDur time.Duration, ordered bool) {
6767
topic := c.Topic(topicName)
6868
topic.PublishSettings = pubsub.PublishSettings{
69-
DelayThreshold: batchDur,
70-
CountThreshold: 950,
71-
ByteThreshold: 9500000,
72-
BufferedByteLimit: 2e9,
69+
DelayThreshold: batchDur,
70+
CountThreshold: 950,
71+
ByteThreshold: 9500000,
7372
}
7473
topic.EnableMessageOrdering = ordered
7574

pubsub/v2/integration_test.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,12 @@ func TestIntegration_PublishReceive(t *testing.T) {
9898
ctx := context.Background()
9999
client := integrationTestClient(ctx, t)
100100

101-
for _, sync := range []bool{false, true} {
102-
for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
103-
testPublishAndReceive(t, client, maxMsgs, sync, false, 10, 0)
104-
}
105-
106-
// Tests for large messages (larger than the 4MB gRPC limit).
107-
testPublishAndReceive(t, client, 0, sync, false, 1, 5*1024*1024)
101+
for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
102+
testPublishAndReceive(t, client, maxMsgs, false, 10, 0)
108103
}
104+
105+
// Tests for large messages (larger than the 4MB gRPC limit).
106+
testPublishAndReceive(t, client, 0, false, 1, 5*1024*1024)
109107
}
110108

111109
// withGoogleClientInfo sets the name and version of the application in
@@ -126,8 +124,8 @@ func withGoogleClientInfo(ctx context.Context) context.Context {
126124
return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...))
127125
}
128126

129-
func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous, exactlyOnceDelivery bool, numMsgs, extraBytes int) {
130-
t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, synchronous, exactlyOnceDelivery, numMsgs), func(t *testing.T) {
127+
func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, exactlyOnceDelivery bool, numMsgs, extraBytes int) {
128+
t.Run(fmt.Sprintf("maxMsgs:%d,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, exactlyOnceDelivery, numMsgs), func(t *testing.T) {
131129
t.Parallel()
132130
testutil.Retry(t, 3, 10*time.Second, func(r *testutil.R) {
133131
ctx := context.Background()
@@ -193,7 +191,6 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou
193191
}
194192

195193
sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
196-
sub.ReceiveSettings.Synchronous = synchronous
197194

198195
// Use a timeout to ensure that Pull does not block indefinitely if there are
199196
// unexpectedly few messages available.
@@ -219,8 +216,8 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou
219216
got[md.ID] = md
220217
}
221218
if !testutil.Equal(got, want) {
222-
r.Errorf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
223-
maxMsgs, synchronous, got, want)
219+
r.Errorf("MaxOutstandingMessages=%d, messages got: %+v, messages want: %+v",
220+
maxMsgs, got, want)
224221
}
225222
})
226223
})
@@ -601,7 +598,6 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
601598
t.Fatalf("topic %v should exist, but it doesn't", topic)
602599
}
603600

604-
topic.PublishSettings.BufferedByteLimit = 100
605601
topic.EnableMessageOrdering = true
606602

607603
orderingKey := "some-ordering-key2"
@@ -792,7 +788,7 @@ func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {
792788
client := integrationTestClient(ctx, t)
793789

794790
for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
795-
testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0)
791+
testPublishAndReceive(t, client, maxMsgs, true, 10, 0)
796792
}
797793
}
798794

pubsub/v2/iterator.go

Lines changed: 14 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@ import (
2727
"cloud.google.com/go/pubsub/internal/distribution"
2828
vkit "cloud.google.com/go/pubsub/v2/apiv1"
2929
pb "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
30-
gax "github.com/googleapis/gax-go/v2"
3130
"github.com/googleapis/gax-go/v2/apierror"
3231
"go.opentelemetry.io/otel/attribute"
3332
"go.opentelemetry.io/otel/propagation"
3433
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
3534
"go.opentelemetry.io/otel/trace"
36-
"google.golang.org/grpc"
3735
"google.golang.org/grpc/codes"
3836
"google.golang.org/grpc/status"
3937
"google.golang.org/protobuf/encoding/protowire"
@@ -126,16 +124,10 @@ type messageIterator struct {
126124
// Stop must be called on the messageIterator when it is no longer needed.
127125
// The iterator always uses the background context for acking messages and extending message deadlines.
128126
func newMessageIterator(subc *vkit.SubscriptionAdminClient, subName string, po *pullOptions) *messageIterator {
129-
var ps *pullStream
130-
if !po.synchronous {
131-
maxMessages := po.maxOutstandingMessages
132-
maxBytes := po.maxOutstandingBytes
133-
if po.useLegacyFlowControl {
134-
maxMessages = 0
135-
maxBytes = 0
136-
}
137-
ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.clientID, maxMessages, maxBytes, po.maxExtensionPeriod)
138-
}
127+
maxMessages := po.maxOutstandingMessages
128+
maxBytes := po.maxOutstandingBytes
129+
130+
ps := newPullStream(context.Background(), subc.StreamingPull, subName, po.clientID, maxMessages, maxBytes, po.maxExtensionPeriod)
139131
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
140132
// the first keepAlive halfway towards the minimum ack deadline.
141133
keepAlivePeriod := minDurationPerLeaseExtension / 2
@@ -270,21 +262,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
270262

271263
var rmsgs []*pb.ReceivedMessage
272264
var err error
273-
if it.po.synchronous {
274-
rmsgs, err = it.pullMessages(maxToPull)
275-
} else {
276-
rmsgs, err = it.recvMessages()
277-
// If stopping the iterator results in the grpc stream getting shut down and
278-
// returning an error here, treat the same as above and return EOF.
279-
// If the cancellation comes from the underlying grpc client getting closed,
280-
// do propagate the cancellation error.
281-
// See https://github.com/googleapis/google-cloud-go/pull/10153#discussion_r1600814775
282-
if err != nil && errors.Is(it.ps.ctx.Err(), context.Canceled) {
265+
rmsgs, err = it.recvMessages()
266+
// If stopping the iterator results in the grpc stream getting shut down and
267+
// returning an error here, treat the same as above and return EOF.
268+
// If the cancellation comes from the underlying grpc client getting closed,
269+
// do propagate the cancellation error.
270+
// See https://github.com/googleapis/google-cloud-go/pull/10153#discussion_r1600814775
271+
if err != nil {
272+
if errors.Is(it.ps.ctx.Err(), context.Canceled) {
283273
err = io.EOF
284274
}
285-
}
286-
// Any error here is fatal.
287-
if err != nil {
275+
// Any error here is fatal.
288276
return nil, it.fail(err)
289277
}
290278

@@ -396,27 +384,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
396384
return nil, nil
397385
}
398386

399-
// Get messages using the Pull RPC.
400-
// This may block indefinitely. It may also return zero messages, after some time waiting.
401-
func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, error) {
402-
// Use it.ctx as the RPC context, so that if the iterator is stopped, the call
403-
// will return immediately.
404-
res, err := it.subc.Pull(it.ctx, &pb.PullRequest{
405-
Subscription: it.subName,
406-
MaxMessages: maxToPull,
407-
}, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
408-
switch {
409-
case errors.Is(err, context.Canceled):
410-
return nil, nil
411-
case status.Code(err) == codes.Canceled:
412-
return nil, nil
413-
case err != nil:
414-
return nil, err
415-
default:
416-
return res.ReceivedMessages, nil
417-
}
418-
}
419-
420387
func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) {
421388
res, err := it.ps.Recv()
422389
if err != nil {
@@ -510,7 +477,7 @@ func (it *messageIterator) sender() {
510477
case <-it.pingTicker.C:
511478
it.mu.Lock()
512479
// Ping only if we are processing messages via streaming.
513-
sendPing = !it.po.synchronous
480+
sendPing = true
514481
case <-it.receiptTicker.C:
515482
it.mu.Lock()
516483
sendReceipt = (len(it.pendingReceipts) > 0)

pubsub/v2/iterator_test.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -402,27 +402,6 @@ func TestIterator_ModifyAckContextDeadline(t *testing.T) {
402402
}
403403
}
404404

405-
func TestIterator_SynchronousPullCancel(t *testing.T) {
406-
srv := pstest.NewServer()
407-
ctx, cancel := context.WithCancel(context.Background())
408-
defer cancel()
409-
410-
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
411-
412-
_, client, err := initConn(ctx, srv.Addr)
413-
if err != nil {
414-
t.Fatal(err)
415-
}
416-
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{})
417-
418-
// Cancelling the iterator and pulling should not result in any errors.
419-
iter.cancel()
420-
421-
if _, err := iter.pullMessages(100); err != nil {
422-
t.Fatalf("Got error in pullMessages: %v", err)
423-
}
424-
}
425-
426405
func TestIterator_BoundedDuration(t *testing.T) {
427406
// Use exported fields for time.Duration fields so they
428407
// print nicely. Otherwise, they will print as integers.
@@ -529,7 +508,6 @@ func TestIterator_StreamingPullExactlyOnce(t *testing.T) {
529508
srv.Publish(fullyQualifiedTopicName, []byte("msg"), nil)
530509

531510
iter := newMessageIterator(client.subc, fullyQualifiedSubName, &pullOptions{
532-
synchronous: false,
533511
maxOutstandingMessages: 100,
534512
maxOutstandingBytes: 1e6,
535513
maxPrefetch: 30,

0 commit comments

Comments
 (0)