Skip to content

Commit 3a22f99

Browse files
committed
Merge pull request #85 from vmarkovtsev/master
Add configurable timeouts
2 parents 70aeac8 + 8255edd commit 3a22f99

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

rabbitpy/connection.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class Connection(base.StatefulObject):
8383
"""
8484
CANCEL_METHOD = ['Basic.Cancel']
8585
DEFAULT_CHANNEL_MAX = 65535
86+
DEFAULT_TIMEOUT = 3
8687
DEFAULT_HEARTBEAT_INTERVAL = 300
8788
DEFAULT_LOCALE = 'en_US'
8889
DEFAULT_URL = 'amqp://guest:guest@localhost:5672/%2F'
@@ -257,7 +258,6 @@ def _close_channels(self):
257258
def _connect(self):
258259
"""Connect to the RabbitMQ Server"""
259260
self._set_state(self.OPENING)
260-
261261
# Create and start the IO object that reads, writes & dispatches frames
262262
self._io = self._create_io_thread()
263263
self._io.daemon = True
@@ -268,7 +268,12 @@ def _connect(self):
268268
if not self._exceptions.empty():
269269
exception = self._exceptions.get()
270270
raise exception
271-
self._events.wait(events.SOCKET_OPENED)
271+
# https://docs.python.org/2.6/library/threading.html#threading.Event.wait # nopep8
272+
# Event.wait() always retuns None in 2.6, so it is impossible
273+
# to simply check wait() result
274+
self._events.wait(events.SOCKET_OPENED, self._args['timeout'])
275+
if not self._events.is_set(events.SOCKET_OPENED):
276+
raise RuntimeError("Timeout waiting for opening the socket")
272277

273278
# If the socket could not be opened, return instead of waiting
274279
if self.closed:
@@ -493,13 +498,19 @@ def _process_url(self, url):
493498
specification.FRAME_MAX_SIZE)
494499
heartbeat_interval = int(query_values.get('heartbeat', [None])[0] or
495500
self.DEFAULT_HEARTBEAT_INTERVAL)
501+
# DEFAULT_TIMEOUT does not have to be 0, so explicitly setting 0
502+
# (False-ish) should not lead to using it, thus no "or" here but
503+
# the precise check against None.
504+
timeout = query_values.get('timeout', [None])[0]
505+
timeout = self.DEFAULT_TIMEOUT if timeout is None else float(timeout)
496506

497507
# Return the configuration dictionary to use when connecting
498508
return {'host': parsed.hostname,
499509
'port': port,
500510
'virtual_host': utils.unquote(vhost),
501511
'username': parsed.username or self.GUEST,
502512
'password': parsed.password or self.GUEST,
513+
'timeout': timeout,
503514
'heartbeat': heartbeat_interval,
504515
'frame_max': frame_max,
505516
'channel_max': channel_max,

rabbitpy/io.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,6 @@ def _write(self):
280280

281281

282282
class IO(threading.Thread, base.StatefulObject):
283-
284-
CONNECTION_TIMEOUT = 3
285283
CONTENT_METHODS = ['Basic.Deliver', 'Basic.GetOk']
286284
READ_BUFFER_SIZE = specification.FRAME_MAX_SIZE
287285
SSL_KWARGS = {'keyfile': 'keyfile',
@@ -435,7 +433,7 @@ def _close(self):
435433
def _connect_socket(self, sock, address):
436434
"""Connect the socket to the specified host and port."""
437435
LOGGER.debug('Connecting to %r', address)
438-
sock.settimeout(self.CONNECTION_TIMEOUT)
436+
sock.settimeout(self._args['timeout'])
439437
sock.connect(address)
440438

441439
def _connect(self):
@@ -465,7 +463,6 @@ def _connect(self):
465463
return
466464

467465
self._socket = sock
468-
self._socket.settimeout(0)
469466
self._events.set(events.SOCKET_OPENED)
470467
self._set_state(self.OPEN)
471468

0 commit comments

Comments
 (0)