Skip to content

Commit 0f76180

Browse files
committed
fix recreate binding on failure
1 parent 54b4382 commit 0f76180

File tree

2 files changed

+19
-19
lines changed

2 files changed

+19
-19
lines changed

src/socketio/async_aiopika_manager.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -101,26 +101,26 @@ async def _publish(self, data):
101101
raise asyncio.CancelledError()
102102

103103
async def _listen(self):
104-
async with (await self._connection()) as connection:
105-
channel = await self._channel(connection)
106-
await channel.set_qos(prefetch_count=1)
107-
exchange = await self._exchange(channel)
108-
queue = await self._queue(channel, exchange)
109-
110-
retry_sleep = 1
111-
while True:
112-
try:
104+
retry_sleep = 1
105+
while True:
106+
try:
107+
async with (await self._connection()) as connection:
108+
channel = await self._channel(connection)
109+
await channel.set_qos(prefetch_count=1)
110+
exchange = await self._exchange(channel)
111+
queue = await self._queue(channel, exchange)
112+
113113
async with queue.iterator() as queue_iter:
114114
async for message in queue_iter:
115115
async with message.process():
116116
yield message.body
117117
retry_sleep = 1
118-
except aio_pika.AMQPException:
119-
self._get_logger().error(
120-
'Cannot receive from rabbitmq... '
121-
'retrying in {} secs'.format(retry_sleep))
122-
await asyncio.sleep(retry_sleep)
123-
retry_sleep = min(retry_sleep * 2, 60)
124-
except aio_pika.exceptions.ChannelInvalidStateError:
125-
# aio_pika raises this exception when the task is cancelled
126-
raise asyncio.CancelledError()
118+
except aio_pika.AMQPException:
119+
self._get_logger().error(
120+
'Cannot receive from rabbitmq... '
121+
'retrying in {} secs'.format(retry_sleep))
122+
await asyncio.sleep(retry_sleep)
123+
retry_sleep = min(retry_sleep * 2, 60)
124+
except aio_pika.exceptions.ChannelInvalidStateError:
125+
# aio_pika raises this exception when the task is cancelled
126+
raise asyncio.CancelledError()

src/socketio/kombu_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ def _publish(self, data):
115115
break
116116

117117
def _listen(self):
118-
reader_queue = self._queue()
119118
retry_sleep = 1
120119
while True:
121120
try:
121+
reader_queue = self._queue()
122122
with self._connection() as connection:
123123
with connection.SimpleQueue(reader_queue) as queue:
124124
while True:

0 commit comments

Comments
 (0)