Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
19ebcce
Start working on WS improvements
Sh3llcod3 Oct 4, 2025
144e6bb
Add in cooperative yielding to prevent one coroutine taking too much …
Sh3llcod3 Oct 4, 2025
cabb59d
Start to cleanup the code a bit
Sh3llcod3 Oct 4, 2025
f481532
Further changes
Sh3llcod3 Oct 4, 2025
31d916b
This is the fastest code yet.
Sh3llcod3 Oct 5, 2025
da43868
Reuse exisiting initialized object
Sh3llcod3 Oct 5, 2025
2efe3f8
Start cleaning up the code
Sh3llcod3 Oct 5, 2025
ad4751d
Continue cleaning up the code
Sh3llcod3 Oct 5, 2025
ac9455a
Further optimizations
Sh3llcod3 Oct 6, 2025
f123bdb
I think this is it on the optimization front
Sh3llcod3 Oct 6, 2025
25fd220
Move the curlopt to session
Sh3llcod3 Oct 7, 2025
fed0cbb
Improve robustness
Sh3llcod3 Oct 7, 2025
3fc339e
Finish the review
Sh3llcod3 Oct 7, 2025
f4bf9d6
Make sends fairer
Sh3llcod3 Oct 8, 2025
e23799f
Optimize further
Sh3llcod3 Oct 8, 2025
2741960
Make queue size larger
Sh3llcod3 Oct 9, 2025
dfcf71d
Remove closed check
Sh3llcod3 Oct 10, 2025
b20bcf4
Turns out the real optimization was the friends we made along the way
Sh3llcod3 Oct 10, 2025
2d0359b
Register the reader once
Sh3llcod3 Oct 10, 2025
01bc8d7
WHY is this faster, I don't get it man, what is life anymore
Sh3llcod3 Oct 10, 2025
5576cfc
Finish cleanup & review
Sh3llcod3 Oct 10, 2025
6f53e22
FUTURE FUTURE FUTURE
Sh3llcod3 Oct 10, 2025
ca5bf47
Merge branch 'lexiforest:main' into Improve-AsyncWebsocket
Sh3llcod3 Oct 11, 2025
9c08b7d
Cleanup code and attempt to fix tests
Sh3llcod3 Oct 11, 2025
b90b1bf
Fix race condition in close
Sh3llcod3 Oct 11, 2025
031ac79
Set it back
Sh3llcod3 Oct 11, 2025
b26b9a1
Every. Last. Optimization.
Sh3llcod3 Oct 11, 2025
141e691
Set nodelay by default
Sh3llcod3 Oct 11, 2025
f9121dc
Slight cleanups
Sh3llcod3 Oct 11, 2025
3bc812b
Slight cleanups
Sh3llcod3 Oct 12, 2025
a2db85a
Eliminate race conditions on close and terminate
Sh3llcod3 Oct 12, 2025
997c977
Cleanups and tweaks
Sh3llcod3 Oct 12, 2025
14fe143
Yield fix
Sh3llcod3 Oct 12, 2025
d325b0a
Merge branch 'lexiforest:main' into Improve-AsyncWebsocket
Sh3llcod3 Oct 13, 2025
395f1a7
Make sending based
Sh3llcod3 Oct 13, 2025
8ddbe78
Improve docstrings
Sh3llcod3 Oct 13, 2025
9017019
Increase RECV buffer
Sh3llcod3 Oct 13, 2025
8a471f5
Turns out setting buffer sizes was not a good idea after all
Sh3llcod3 Oct 13, 2025
989b198
Code is ready to review
Sh3llcod3 Oct 14, 2025
f8a769e
Make it even more robust
Sh3llcod3 Oct 14, 2025
1baa219
Finalize for review
Sh3llcod3 Oct 15, 2025
a855875
Remove the curlopt that I thought I removed
Sh3llcod3 Oct 15, 2025
7cf312c
Whoops - set curl max frame size properly
Sh3llcod3 Oct 15, 2025
7ef2d11
Write docs, client and server for first benchmark
Sh3llcod3 Oct 16, 2025
2a9f787
Lint
Sh3llcod3 Oct 17, 2025
bb270d1
Add fair scheduling changes and optimizations
Sh3llcod3 Oct 18, 2025
1c26dcb
Move yield mask to parameter
Sh3llcod3 Oct 18, 2025
0d3d39b
Optimize further
Sh3llcod3 Oct 18, 2025
ee0b802
We are reaching levels of optimization that shouldn't even be possible
Sh3llcod3 Oct 18, 2025
4c8b3df
Improve benchmark README
Sh3llcod3 Oct 19, 2025
445121f
Start adding in second benchmark
Sh3llcod3 Oct 19, 2025
d498d7f
Continue writing second benchmark
Sh3llcod3 Oct 20, 2025
0e2e03b
Complete initial version of second benchmark
Sh3llcod3 Oct 21, 2025
b2fe11d
Complete the benchmarks
Sh3llcod3 Oct 22, 2025
60146a2
Finalize the README
Sh3llcod3 Oct 22, 2025
e3db8d7
What CI/CD doing?
Sh3llcod3 Oct 23, 2025
728f048
We like the robustness
Sh3llcod3 Oct 23, 2025
6cbc79c
Improve docstrings
Sh3llcod3 Oct 23, 2025
007c1d4
Update docstrings
Sh3llcod3 Oct 23, 2025
e81f82b
Seperate out send and recv queue sizes for better control
Sh3llcod3 Nov 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,10 @@ from curl_cffi import AsyncSession
async with AsyncSession() as s:
ws = await s.ws_connect("wss://echo.websocket.org")
await asyncio.gather(*[ws.send_str("Hello, World!") for _ in range(10)])
await ws.flush()
async for message in ws:
print(message)
await ws.close()
```

## Ecosystem
Expand Down
125 changes: 124 additions & 1 deletion benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,128 @@ Async clients
Target
------


All the clients run with session/client enabled.

Async WebSocket
------

Two distinct benchmarks are provided to evaluate the performance of the `AsyncWebSocket` implementation under different conditions.

1. Simple Throughput Test ([`client`](ws_bench_1_client.py), [`server`](ws_bench_1_server.py))

This is a lightweight, in-memory benchmark designed to measure the raw throughput and overhead of the WebSocket client. The server sends a repeating chunk of random bytes from memory, and the client receives it. This test is useful for quick sanity checks and detecting performance regressions under ideal, CPU-cached conditions.

2. Verified Streaming Test ([`benchmark`](ws_bench_2.py))

This is a rigorous, end-to-end test. It first generates a multi-gigabyte file of random data and its SHA256 hash. The benchmark then streams this file from disk over the WebSocket connection. The receiving end calculates the hash of the incoming stream and verifies it against the original, ensuring complete data integrity.

**Important**: This test requires enough RAM free on the system equal to the size of the random data. It measures the performance of the entire system pipeline, including Disk I/O speed, CPU hashing speed, and network transfer. On modern systems, it is likely to be bottlenecked by the CPU's hashing performance or the disk's read speed.

### Prerequisites

- Python 3.10+
- Pip packages

```bash
pip install aiohttp curl_cffi
```

> `uvloop` is highly recommended for performance on Linux and macOS. The benchmarks will automatically fall back to the standard asyncio event loop if it is not installed or on Windows.
### Setup

1. TLS certificate (optional)

These benchmarks are configured to use WSS (secure WebSockets) by default on Linux and macOS. To generate a self-signed certificate:

```bash
openssl req -x509 -newkey rsa:2048 -nodes -keyout localhost.key -out localhost.crt -days 365 -subj "/CN=localhost"
```

> **Note**: If you are on any platform and skip certificate generation, the benchmarks will use the insecure `ws://` instead.

2. Configuration

The benchmark parameters (total data size, chunk size) can be modified by editing the `TestConfig` class within the [`ws_bench_utils.py`](ws_bench_utils.py) file. By default, both benchmarks are configured for `10 GiB` of data transfer.

### Running the Benchmarks

It is recommended to run the server and client in separate terminal windows.

#### Benchmark 1: Simple Throughput Test

1. Start the Server:

```bash
python ws_bench_1_server.py
```

2. Run the Client:

```bash
python ws_bench_1_client.py
```

#### Benchmark 2: Verified Streaming Test

1. Generate Test File (Initial Setup):

This command will create a large (`10 GiB`) file named `testdata.bin` and its hash. Ensure you have sufficient disk space:

```bash
python ws_bench_2.py generate
```

2. Start the Server:

```bash
python ws_bench_2.py server
```

3. Run the Client (Choose one):

- To test download speed (server sends, client receives):

```bash
python ws_bench_2.py client --test download
```

- To test upload speed (client sends, server receives):

```bash
python ws_bench_2.py client --test upload
```

### Performance Considerations

Benchmark results can vary significantly based on system-level factors. The following should be kept in mind:

- **Loopback Interface**: These tests run on a local loopback interface (`127.0.0.1`), which does not represent real-world internet conditions (latency, packet loss, etc.).

- **CPU Affinity**: For maximum consistency, especially on multi-core or multi-CPU (NUMA) systems, you can pin the server and client processes to specific CPU cores. This avoids performance penalties from processes migrating between cores or crossing CPU socket boundaries.

**On Linux:**
Use `taskset` to specify a CPU core (e.g., core 0 for the server, core 1 for the client).

```bash
# Terminal 1
taskset -c 0 python ws_bench_1_server.py
# Terminal 2
taskset -c 1 python ws_bench_1_client.py
```

**On Windows:**
Use the `start /affinity` command. The affinity mask is a hexadecimal number (`1` for CPU 0, `2` for CPU 1, `4` for CPU 2, etc.).

```powershell
# PowerShell/CMD 1
start /affinity 1 python ws_bench_1_server.py
# PowerShell/CMD 2
start /affinity 2 python ws_bench_1_client.py
```

- **Concurrent Tests**: The first benchmark code (`ws_bench_1_client.py`) can be uncommented to run upload and download tests concurrently. Note that a concurrent test will terminate as soon as the faster of the two directions (typically download) completes.

- **Queue Sizes**: Adjust the `send_queue` and `recv_queue` sizes within the [`TestConfig`](ws_bench_utils.py) class to observe the impact on performance and backpressure.
178 changes: 178 additions & 0 deletions benchmark/ws_bench_1_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""
Websocket client simple benchmark - TLS (WSS)
"""

import time
from asyncio import (
FIRST_COMPLETED,
AbstractEventLoop,
CancelledError,
Task,
sleep,
wait,
)

from typing_extensions import Never
from ws_bench_utils import binary_data_generator, config, get_loop, logger

from curl_cffi import AsyncSession, AsyncWebSocket, WebSocketClosed


def calculate_stats(start_time: float, total_len: int) -> tuple[float, float]:
"""Calculate the amount of time it took and the throughput average.
Args:
start_time (`float`): The start time from the performance counter
Returns:
`tuple[float, float]`: The duration and rate in Gbps
"""
end_time: float = time.perf_counter()
duration: float = end_time - start_time
rate_gbps: float = (total_len * 8) / duration / (1024**3)
return duration, rate_gbps


async def health_check() -> Never:
"""A simple coroutine that continuously prints a dot to prove that the event loop
is alive and not starved from being able to run this task.
Returns:
Never: Keeps printing dots until the task is cancelled.
"""
counter = 0
logger.info("Starting sanity check. You should see a continuous stream of dots '.'")
logger.info("If the dots stop for a long time, the event loop is blocked.")
try:
while True:
await sleep(0.05)
print(".", end="", flush=True)
counter += 1
if counter % 100 == 0:
print("")
finally:
print("\r\x1b[K", end="")
logger.info("Sanity check complete.")


async def ws_counter(ws: AsyncWebSocket) -> None:
"""Simple coroutine which counts how many bytes were received.
Args:
ws (`AsyncWebSocket`): Instantiated Curl CFFI AsyncWebSocket object.
"""
recvd_len: int = 0
start_time: float = time.perf_counter()
logger.info("Receiving data from server")
try:
async for msg in ws:
recvd_len += len(msg)

except WebSocketClosed as exc:
logger.debug(exc)

finally:
duration, avg_rate = calculate_stats(start_time, recvd_len)
print("\r\x1b[K", end="")
logger.info(
"Received: %.2f GB in %.2f seconds", recvd_len / (1024**3), duration
)
logger.info("Average throughput (recv): %.2f Gbps", avg_rate)


async def ws_sender(ws: AsyncWebSocket) -> None:
"""Simple coroutine which just sends the same chunk of bytes until exhausted.
Args:
ws (`AsyncWebSocket`): Instantiated Curl CFFI AsyncWebSocket object.
"""
sent_len: int = 0
start_time: float = time.perf_counter()
logger.info("Sending data to server")
try:
async for data_chunk in binary_data_generator(
total_gb=config.total_gb, chunk_size=min(65535, config.chunk_size)
):
_ = await ws.send(payload=data_chunk)
sent_len += len(data_chunk)

except WebSocketClosed as exc:
logger.debug(exc)

finally:
duration, avg_rate = calculate_stats(start_time, sent_len)
print("\r\x1b[K", end="")
logger.info("Sent: %.2f GB in %.2f seconds", sent_len / (1024**3), duration)
logger.info("Average throughput (send): %.2f Gbps", avg_rate)


async def run_benchmark(loop: AbstractEventLoop) -> None:
"""
Simple client benchmark which sends/receives binary messages using curl-cffi.
"""
logger.info("Starting curl-cffi benchmark")
ws: AsyncWebSocket | None = None
waiters: set[Task[None]] = set()
try:
async with AsyncSession(impersonate="chrome", verify=False) as session:
ws = await session.ws_connect(
config.srv_path,
recv_queue_size=config.recv_queue,
send_queue_size=config.send_queue,
)
logger.info("Connection established to %s", config.srv_path)

# NOTE: Uncomment for send/recv benchmark or both
waiters.add(loop.create_task(ws_counter(ws)))
# waiters.add(loop.create_task(ws_sender(ws)))

_, _ = await wait(waiters, return_when=FIRST_COMPLETED)

except Exception:
logger.exception("curl-cffi benchmark failed")
raise

finally:
for wait_task in waiters:
try:
if not wait_task.done():
_ = wait_task.cancel()
await wait_task

except CancelledError:
...
if ws:
await ws.close(timeout=2)


async def main(loop: AbstractEventLoop) -> None:
"""Entrypoint"""
waiters: set[Task[None]] = set()

try:
# Create the health check and benchmark tasks
waiters.update(
{loop.create_task(health_check()), loop.create_task(run_benchmark(loop))}
)
_, _ = await wait(waiters, return_when=FIRST_COMPLETED)

except (KeyboardInterrupt, CancelledError):
logger.debug("Cancelling benchmark")

finally:
for wait_task in waiters:
try:
if not wait_task.done():
_ = wait_task.cancel()
await wait_task
except CancelledError:
...


if __name__ == "__main__":
evt_loop: AbstractEventLoop = get_loop()
try:
evt_loop.run_until_complete(main(evt_loop))
finally:
evt_loop.close()
Loading
Loading