-
-
Notifications
You must be signed in to change notification settings - Fork 406
Improve Async WebSocket #650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
> "Ah it must be better to use events, surely these things are light weight" > Looks under hood > self._get_loop().create_future() > self._waiters.append(fut) > Bruh
|
Cool, the improvements are significant. Thanks! |
|
@lexiforest Unless I find anything else, the code should be good to review now 😎 I've updated the description. |
|
Thanks, this is a huge PR, it could take some time to review. :) |
|
Sounds good, I shall await :) (Also whoops I set the frame size to 1MB, curl max frame size is 64k according to their docs 😅) |
|
Do you mind adding some of you benchmark code here? And also mention it in the readme, if you think it helps. |
|
Added first benchmark, I will add the second one soon. |
|
@lexiforest Wow that took a while, but we got there in the end. Both benchmarks are there. I've also added instructions on running them into the benchmark README. They should work cross-platform straight away. I've gone through and checked all the code and ran the usual set of linters and type checkers. It should now be ready for review 😊 |
Everyone asks what CI is doing, but no one asks *how* is CI?
|
I ran the final benchmark code without TLS just for fun:
Thanks to my friend's benchmarks on his Windows machine, I've made the code even more resilient. @lexiforest I will be travelling till the end of the month, so feel free to make any changes you see fit. |
- Also: flushed 😳


Overview
This is a complete and fundamental rewrite of the Async WebSocket. This overhaul replaces the implementation with a fully asynchronous, task-based architecture that is idiomatic, highly performant, and hardened for production use. It now fails predictably and cleans up its resources correctly under all circumstances.
This pull request also addresses several lifecycle and error-handling bugs in the AsyncWebSocket implementation.
The key focus from this rewrite is to achieve the best possible performance while keeping the code clean and breaking changes to a minimum. With that said, there are some breaking changes, so let's get those out of the way.
Breaking Changes
send()Error Handling is now asynchronous:send()operation would raise an exception directly from theawait ws.send(...)call.await ws.recv(). Application-level error handling must be adapted to this new model.send()is now non-blocking and Queued:await ws.send(...)would block until the data was passed to the OS socket buffer.await ws.send(...)returns almost instantly after placing the message in an internal queue. To guarantee that all queued messages have been picked up for sending, callers must now use the new awaitws.flush()method.recv_fragment()Removed: Therecv_fragmentmethod is no longer part of the class. Message reassembly is now an internal implementation detail of the read loop.Performance
The new implementation completely does away with the use of
run_in_executor. The original issue I raised #645, found that the overhead from this was massive and taking up most of the time, leading to very low throughput figures. From reading the original code, it became clear that the Curl socket is not blocking and simply returns EAGAIN when it is not ready to be read from/written to.This implementation handles those EAGAINs and uses a similar behaviour to the
aselect(...)to wait on FD availability. Instead of running the libcurl calls in a thread pool to avoid blocking, the code opts for a cooperative yielding approach, where every N number of I/O ops or after a certain time has elapsed, it will yield control to the event loop ensuring other things have a fair chance to run.This is obviously a double-edged sword. Providing too much time to the event loop will result in lower throughput, while taking up too much event loop time will starve the loop from being able to run the other applications. I've set values which I believe are a good balance of throughput and fairly yielding control.
The current implementation uses background I/O tasks for reading and writing, which decouples the I/O operation from the public API. That means, when you call
send(...)orrecv(...), you are simply performing a lightweight operation which adds the WS frame into an asyncio queue. When the FD is next available for reading or writing, it will pluck the frame from the queue and write it into the curl socket and vice versa. This tries to be the best of both worlds approach suitable for continnous and sparse messages by engaging CPU time only when it is needed and efficiently waiting at idle state.Due to the decoupling, it completely eliminates the crashing risk when calling send/recv concurrently. Since it is performing operations on an async queue, which by its very nature is designed for concurrent operations, there is no risk from crashing this way. As long as there is only one instance of the I/O loop tasks, there will only ever be one reader/writer interacting with the curl socket sequentially, removing this risk.
On the write side, we are using some optimization techniques including adaptive batching and send coalesceing (optional) to try and reduce the overhead / number of system calls made. These techniques do actually work, but in this case, the benefits are limited, as there seems to be a hard cap on send throughput (around
40MiB/s on my machine). During development, I've taken tens (if not hundreds) of different cProfile performance captures and no matter what I do,send()throughput is hard limited by curl'sws_send()call. It is likely that it cannot be improved further.In case you are wondering: Does the send coalescing actually work? Yes!
Here is a cProfile with the flag turned on, sending
10GiBs of data with a 65k chunk size:Here is another cProfile with the same transfer but without the feature:
Now the throughput is hard capped on my machine so the throughput is not that faster, but see the difference?
That's
10188calls tows_send()with the feature vs162870function calls without. Also, more than95%of the time is spent in the blocking C function and there is little overhead from the PythonWith streaming protocols which do not care about frame boundaries, this works nicely for smaller chunks. Here is the speed of that same transfer with
2048byte chunk size without and withcoalesce_frames=True:On the Curl FFI side, I've made one key optimization which significantly speeds up receive throughput. Rather than creating a new
ffi.buffer(...)object on reach fragment received, it now pre-allocates and reuses that existing buffer. This speeds things up a lot by avoiding repeated memory allocations in a tight loop. This is safe to do, since the buffer is read up to a bounded value, so it does not need to be cleared or allocated each time. Also, a bytes copy is made when the function returns so there are no referencing issues. I tried to make it return amemoryview(ffi.buffer(...))and it was quite a bit faster, but it ran into referencing issues since the view is pointing to the pre-allocated buffer. This means any changes there will change what all the memoryview instances are pointing to. Although the idea of bytes copying doesn't sound great, in reality, other than returning a no-copy view into the buffer, all other methods are slower. I tried returningbytearrayobjects or using a pre-allocatedbytearrayand extending the fragments into it, but that ends up being slower. In the end, having a list of bytes and callingb"".join(chunks)was the fastest way to collect up the fragments into one complete message.Benchmark Code
Now let's get to the interesting part. The numbers tell their own story.
In order to arrive at these numbers, I am running two different sets of benchmarks.
The first benchmark (
client.pyandserver_ssl.py) simply sends and/or receives in a loop (depending on what code is commented out) and doesn't do anything other than time how long it took.The second benchmark (
benchmark.py) is slightly more realistic. It reads a file from disk to memory and loads its hash. Depending on if it was ran in server mode or client mode, it will receive or send that file and the other end will calculate a hash of the received data and compare against loaded hash of the file on disk and let you know if it matches or not. This will be slower due to the higher overhead.Neither benchmark is good and the benchmark code is bad. They are simply a rough proof of concept.
I am using
btopto monitor throughput. I am running each test more than once, noting that the first run is measurably slower sometimes, possibly due to the CPU cache. Either way a 5~10% variation on these figures should be factored in. I am using a HPE DL-60 Gen 9 rack server. This is an ancient server which clocks up to 1.6 GHz, so your results may vary. One slightly nice aspect is that I have 80 GB RAM on this server, hence I can run the benchmarks with 20 GB files loaded in memory without any issues.Files
Slow Case
The slow case is where the chunk size in all the benchmarks have been set to a small value, i.e.
2048bytes. This increases the number of libcurl calls needed to transfer the same amount of data and slows down the throughput.Download
client.py:benchmark.py:Upload
client.py:benchmark.py:Average Case
The average case only considers
benchmark.pyresults with a larger chunk size of65536bytes.Download
Upload
Best Case
These numbers are unrealistic and unlikely to be achievable in a real application. This only considers the
client.pybenchmark, which effectively just loops and does absolutely nothing with the received/sent data. Any application processing logic will create overhead and slow down the throughput. These figures are included here as an optimistic "what-if there was nothing else, how fast could we go?" situation. The chunk size is65536bytes.Download
My friend also ran this benchmark on his Windows machine which has a modern CPU that is about 10 years newer than my poor server:
Upload
Concurrent
Here is a test from the same benchmark, sending and receiving concurrently (without/with frame coalescing):
Closing Note
Should you trust my benchmarks and statements? Absolutely not.
It's always best to run your own benchmarks (and double check the code yourself), since I have no idea what your use case is. I am benchmarking on
localhostto eliminate network conditions as a factor, but it may be that real internet conditions impact the numbers significantly, in which case the relevant changes should be made.Also if you think the current
send(...)api does not provide adequate guarantees of transmission, I can change this so that:send(...)is called, it inserts the content to be sent along with anasyncio.Futureobject into the send queuesend(...)method then either returns a reference to theFutureobject or awaits it directly.ws_send(...)is called, theFutureresult is either set to the number of bytes sent or anExceptionobjectThat would guarantee that the item has been sent out, but it would increase memory pressure and slow down the sending. So I think that should be considered. If that's needed it can be done as part of this PR or another follow up PR. I also need to think through how that'd work for coalesced sends.
There is definitely room for improvement with this code and the architecture overall. I acknowledge that this is just one person's attempt at making a useful library better (although various LLMs were used to review the code and find improvements).
Feel free to suggest improvements. I will do my best to integrate them and ensure the code is in the best state possible.