Skip to content
64 changes: 63 additions & 1 deletion pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import absolute_import

import datetime
import time
import uuid

Expand Down Expand Up @@ -103,4 +104,65 @@ def test_subscribe_to_messages():
assert callback.call_count >= 50
finally:
publisher.delete_topic(topic_name)
subscriber.delete_subscription(sub_name)


def test_subscribe_to_messages_async_callbacks():
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_name = _resource_name('topic')
sub_name = _resource_name('subscription')

try:
# Create a topic.
publisher.create_topic(topic_name)

# Subscribe to the topic. This must happen before the messages
# are published.
subscriber.create_subscription(sub_name, topic_name)
subscription = subscriber.subscribe(sub_name)

# Publish some messages.
futures = [publisher.publish(
topic_name,
b'Wooooo! The claaaaaw!',
num=str(i),
) for i in range(0, 2)]

# Make sure the publish completes.
[f.result() for f in futures]

# We want to make sure that the callback was called asynchronously. So
# track when each call happened and make sure below.
call_times = []

def process_message(message):
# list.append() is thread-safe.
call_times.append(datetime.datetime.now())
time.sleep(2)
message.ack()

callback = mock.Mock(wraps=process_message)
side_effect = mock.Mock()
callback.side_effect = side_effect

# Actually open the subscription and hold it open for a few seconds.
subscription.open(callback)
for second in range(0, 5):
time.sleep(4)

# The callback should have fired at least two times, but it may
# take some time.
if callback.call_count >= 2 and side_effect.call_count >= 2:
first = min(call_times[:2])
last = max(call_times[:2])
diff = last - first
# "Ensure" the first two callbacks were executed asynchronously
# (sequentially would have resulted in a difference of 2+
# seconds).
assert diff.days == 0
assert diff.seconds < 2

# Okay, we took too long; fail out.
assert callback.call_count >= 2
finally:
publisher.delete_topic(topic_name)
35 changes: 29 additions & 6 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ def test_on_exception_other():
def test_on_response():
callback = mock.Mock(spec=())

# Create mock ThreadPoolExecutor, pass into create_policy(), and verify
# that both executor.submit() and future.add_done_callback are called
# twice.
future = mock.Mock()
attrs = {'submit.return_value': future}
executor = mock.Mock(**attrs)

# Set up the policy.
policy = create_policy()
policy = create_policy(executor=executor)
policy._callback = callback

# Set up the messages to send.
Expand All @@ -112,9 +119,25 @@ def test_on_response():
],
)

# Actually run the method and prove that the callback was
# called in the expected way.
# Actually run the method and prove that executor.submit and
# future.add_done_callback were called in the expected way.
policy.on_response(response)
assert callback.call_count == 2
for call in callback.mock_calls:
assert isinstance(call[1][0], message.Message)

submit_calls = [m for m in executor.method_calls if m[0] == 'submit']
assert len(submit_calls) == 2
for call in submit_calls:
assert call[1][0] == callback
assert isinstance(call[1][1], message.Message)

add_done_callback_calls = [
m for m in future.method_calls if m[0] == 'add_done_callback']
assert len(add_done_callback_calls) == 2
for call in add_done_callback_calls:
assert call[1][0] == thread._callback_completed


def test__callback_completed():
future = mock.Mock()
thread._callback_completed(future)
result_calls = [m for m in future.method_calls if m[0] == 'result']
assert len(result_calls) == 1