-
Notifications
You must be signed in to change notification settings - Fork 57
Closed
Description
I'm stress-testing my reconnect logic, and I appear to have managed to get the interface wedged in the rabbitpy somewhere.
Basically, the steps were as followed:
- The AMQP connection was killed via the RabbitMQ web management console (the 'Force Close' button)
- Active consumers notice the connection failure, set a termination flag.
- Manager thread tears down consumer threads, calls
interface.close()on the my connection manager. - Connection manager iterates over the instantiated connection components (two
rabbitpy.Queueinstances, arabbitpy.Channelinstance, and the baserabbitpy.Connectioninstance), calling<instance>.close()on each, in the reverse order they were constructed. - One of the
xxx.close()calls wedges.
I can get a remote traceback from the wedged thread using pystuck. The thread is doing something, as the traceback is changing, and it's consuming 100% CPU. Here are some of the sniffed tracebacks:
<Thread(Thread-3, started daemon 139660919564032)>
File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
self._bootstrap_inner()
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 526, in run_fetcher
connection = ConnectorManager(config, runstate, active, tx_q, rx_q)
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 500, in monitor_loop
self._disconnect()
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 346, in _disconnect
self.interface.close()
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 127, in close
func()
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/channel.py", line 141, in close
super(Channel, self).close()
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 208, in close
self.rpc(frame_value)
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 227, in rpc
return self._wait_on_frame(frame_value.valid_responses)
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 455, in _wait_on_frame
self._read_queue.put(value)
File "/usr/lib/python3.5/queue.py", line 145, in put
self.not_empty.notify()
File "/usr/lib/python3.5/threading.py", line 347, in notify
return
<Thread(Thread-3, started daemon 139660919564032)>
File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
self._bootstrap_inner()
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 526, in run_fetcher
connection = ConnectorManager(config, runstate, active, tx_q, rx_q)
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 500, in monitor_loop
self._disconnect()
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 346, in _disconnect
self.interface.close()
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 127, in close
func()
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/channel.py", line 141, in close
super(Channel, self).close()
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 208, in close
self.rpc(frame_value)
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 227, in rpc
return self._wait_on_frame(frame_value.valid_responses)
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 449, in _wait_on_frame
value = self._read_from_queue()
File "/usr/lib/python3.5/queue.py", line 145, in put
self.not_empty.notify()
<Thread(Thread-3, started daemon 139660919564032)>
File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
self._bootstrap_inner()
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 526, in run_fetcher
connection = ConnectorManager(config, runstate, active, tx_q, rx_q)
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 500, in monitor_loop
self._disconnect()
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 346, in _disconnect
self.interface.close()
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 127, in close
func()
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/channel.py", line 141, in close
super(Channel, self).close()
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 208, in close
self.rpc(frame_value)
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 227, in rpc
return self._wait_on_frame(frame_value.valid_responses)
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 449, in _wait_on_frame
value = self._read_from_queue()
File "/usr/lib/python3.5/queue.py", line 145, in put
self.not_empty.notify()
File "/usr/lib/python3.5/threading.py", line 241, in __exit__
return self._lock.__exit__(*args)
<Thread(Thread-3, started daemon 139660919564032)>
File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
self._bootstrap_inner()
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 526, in run_fetcher
connection = ConnectorManager(config, runstate, active, tx_q, rx_q)
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 500, in monitor_loop
self._disconnect()
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 346, in _disconnect
self.interface.close()
File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 127, in close
func()
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/channel.py", line 141, in close
super(Channel, self).close()
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 208, in close
self.rpc(frame_value)
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 227, in rpc
return self._wait_on_frame(frame_value.valid_responses)
File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 455, in _wait_on_frame
self._read_queue.put(value)
Note that this is with a local copy of rabbitpy in my project's directory, because I wanted to be able to poke around and see if I could debug the issues (related to #101).
Metadata
Metadata
Assignees
Labels
No labels