Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,10 @@ def _obtain_ack_deadline(self, maybe_update: bool) -> float:
self._ack_deadline = max(
self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED
)

# If we have updated the ack_deadline and it is longer than the stream_ack_deadline
# set the stream_ack_deadline to the new ack_deadline.
if self._ack_deadline > self._stream_ack_deadline:
self._stream_ack_deadline = self._ack_deadline
return self._ack_deadline

@property
Expand Down Expand Up @@ -818,7 +821,7 @@ def open(
)

# Create the RPC
stream_ack_deadline_seconds = self.ack_deadline
stream_ack_deadline_seconds = self._stream_ack_deadline

get_initial_request = functools.partial(
self._get_initial_request, stream_ack_deadline_seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension():
# The deadline configured in flow control should prevail.
deadline = manager._obtain_ack_deadline(maybe_update=True)
assert deadline == histogram.MAX_ACK_DEADLINE
assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE


def test__obtain_ack_deadline_with_max_duration_per_lease_extension_too_low():
Expand All @@ -283,6 +284,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension_too_high():
# The deadline configured in flow control should be adjusted to the maximum allowed.
deadline = manager._obtain_ack_deadline(maybe_update=True)
assert deadline == histogram.MAX_ACK_DEADLINE
assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE


def test__obtain_ack_deadline_with_exactly_once_enabled():
Expand All @@ -299,6 +301,7 @@ def test__obtain_ack_deadline_with_exactly_once_enabled():
# Since the 60-second min ack_deadline value for exactly_once subscriptions
# seconds is higher than the histogram value, the deadline should be 60 sec.
assert deadline == 60
assert manager._stream_ack_deadline == 60


def test__obtain_ack_deadline_with_min_duration_per_lease_extension_with_exactly_once_enabled():
Expand All @@ -316,6 +319,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension_with_exactly
# User-defined custom min ack_deadline value takes precedence over
# exactly_once default of 60 seconds.
assert deadline == histogram.MAX_ACK_DEADLINE
assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE


def test__obtain_ack_deadline_no_value_update():
Expand Down Expand Up @@ -1148,7 +1152,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
)
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
assert initial_request_arg.func == manager._get_initial_request
assert initial_request_arg.args[0] == 18
assert initial_request_arg.args[0] == 60
assert not manager._client.get_subscription.called

resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
Expand Down Expand Up @@ -1833,6 +1837,7 @@ def test__on_response_disable_exactly_once():
# exactly_once minimum since exactly_once has been disabled.
deadline = manager._obtain_ack_deadline(maybe_update=True)
assert deadline == histogram.MIN_ACK_DEADLINE
assert manager._stream_ack_deadline == 60


def test__on_response_exactly_once_immediate_modacks_fail():
Expand Down