Skip to content

Allow to register application background tasks within an event loop #1092

@f0t0n

Description

@f0t0n

Long story short

I run an aiohttp application with Gunicorn behind nginx.
The application exposes two websocket connection endpoints for mobile clients: one for RPC and other for Pub/Sub.

When some event is occurred in the system (e,g, there're some news from company for clients) I want to notify all the connected websockets that I store e.g. in my app['pub_sub_websockets']. So basically I would iterate them in a loop and send bytes to each one.

Each application instance that Gunicorn runs in a separate process has own collection of connected websockets. But I have to notify all the websockets connected to all workers (or even to all workers on all server nodes). Therefore I would proxy the event through some messaging system like Redis Pub/Sub or ZeroMQ. In my case it's ZeroMQ.

The problem is I can't easily setup the "listener" coroutine that will run within the application's event loop to subscribe to ZeroMQ proxy and forward messages to connected websockets.

Therefore I feel like I have to create own ZeroMQ socket zmq.SUB for each of connected websockets inside my websocket request handler and there gather two coroutines - the first will be listening on ws and the second one will be listening on ZeroMQ's socket. So that the amount of ZeroMQ's zmq.SUB sockets will grow linearly depending to the amount of clients connected to the application process via websockets.

Even with this approach I can't tell the application (actually GunicornWebWorker) to use zmq.asyncio.ZMQEventLoop that's required to use with zmq.asyncio.

Expected behaviour

1) I propose to provide an interface from the web.Application like Application.register_background_task(coro) that will allow to register as many tasks as needed during the application instance setup to run along with GunicornWebWorker._runner within the event loop the worker creates, say using asyncio.gather().

In this way in my particular case I'd create only one ZeroMQ socket to listen for a topic that will run within the event loop instead of one ZMQ socket per one connected WebSocket.

And if I have to listen more event sources (message queues from different providers, etc.) I'll add one more socket (or other consuming object depending to the event source provider) and not one more thousand sockets for each thousand of currently connected websockets.

2) Also I think there should be some API to choose which event loop class to use inside the GunicornWebWorker.init_process().
Say if I understand the ZMQ's documentation correctly I must use zmq.asyncio.ZMQEventLoop to deal with ZeroMQ in my asynchronous application but I can't set this type of loop in the worker without creating of own worker class inherited from GunicornWebWorker.

Maybe the loop creation logic should be separated in own public method that could be overridden or there should be just an option to choose a class of the event loop to use within the worker. By the way maybe then we won't need a separate GunicornUVLoopWebWorker class if we'll be able to set an event loop class somewhere?

Please tell me if it's possible to introduce the stuff described above in the aiohttp library? Maybe it's just my architectural approach is totally wrong itself? Then I'd be glad to hear some good guidance to correct it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions