Skip to content

Commit 7cb3e26

Browse files
committed
bookmarkmgr: cronet: Recycle executor and threads
1 parent d937656 commit 7cb3e26

File tree

3 files changed

+53
-45
lines changed

3 files changed

+53
-45
lines changed

bookmarkmgr/bookmarkmgr/__main__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import argparse
44
import asyncio
55
from collections.abc import Callable
6+
from concurrent.futures import ThreadPoolExecutor
67
from getpass import getpass
78
import logging
89
import signal
@@ -48,6 +49,13 @@ def parse(value: str) -> float | int | str:
4849

4950

5051
async def run_command(args: argparse.Namespace, raindrop_api_key: str) -> None:
52+
# Cronet may use a lot of threads.
53+
asyncio.get_running_loop().set_default_executor(
54+
ThreadPoolExecutor(
55+
sys.maxsize,
56+
),
57+
)
58+
5159
async with RaindropClient(raindrop_api_key) as raindrop_client:
5260
match args.command:
5361
case "export-collection":
Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
import asyncio
2-
from concurrent.futures import ThreadPoolExecutor
3-
from queue import Queue
4-
from typing import Any, Self, TYPE_CHECKING
5-
6-
if TYPE_CHECKING:
7-
from collections.abc import Awaitable
2+
from typing import Any, Self
83

4+
from bookmarkmgr.asyncio import ForgivingTaskGroup
95
from bookmarkmgr.cronet._cronet import ffi, lib
106
from bookmarkmgr.cronet.errors import NotContextManagerError
117
from bookmarkmgr.cronet.types import Executor, Runnable
@@ -16,15 +12,21 @@ def _executor_execute(executor: Executor, runnable: Runnable) -> None:
1612
manager: ExecutorManager = ffi.from_handle(
1713
lib.Cronet_Executor_GetClientContext(executor),
1814
)
19-
manager.enqueue_runnable(runnable)
15+
manager.submit_runnable(runnable)
16+
17+
18+
def _process_runnable(runnable: Runnable) -> None:
19+
try:
20+
lib.Cronet_Runnable_Run(runnable)
21+
finally:
22+
lib.Cronet_Runnable_Destroy(runnable)
2023

2124

2225
class ExecutorManager:
2326
def __init__(self) -> None:
24-
self._handle = ffi.new_handle(self)
25-
self._queue: Queue[Runnable | None] = Queue()
26-
self._worker: Awaitable[None] | None = None
2727
self._executor: Executor | None = None
28+
self._handle = ffi.new_handle(self)
29+
self._task_group: ForgivingTaskGroup | None = None
2830

2931
async def __aenter__(self) -> Self:
3032
if self._executor is None:
@@ -33,10 +35,11 @@ async def __aenter__(self) -> Self:
3335
)
3436
lib.Cronet_Executor_SetClientContext(self._executor, self._handle)
3537

36-
self._processing_allowed = True
38+
if self._task_group is None:
39+
self._task_group = ForgivingTaskGroup()
40+
await self._task_group.__aenter__()
3741

38-
if self._worker is None:
39-
self._worker = asyncio.create_task(self._spawn_worker_thread())
42+
self._processing_allowed = True
4043

4144
return self
4245

@@ -46,43 +49,36 @@ async def __aexit__(
4649
*args: Any, # noqa: PYI036
4750
**kwargs: Any,
4851
) -> None:
49-
self.shutdown(process_pending=exc_type is None)
52+
self._processing_allowed = False
5053

5154
try:
52-
if self._worker is not None:
53-
await self._worker
55+
if self._task_group is not None:
56+
await self._task_group.__aexit__(exc_type, *args, **kwargs)
57+
self._task_group = None
5458
finally:
5559
if self._executor is not None:
5660
lib.Cronet_Executor_Destroy(self._executor)
5761
self._executor = None
5862

59-
self._worker = None
63+
def submit_runnable(self, runnable: Runnable) -> None:
64+
if self._task_group is None:
65+
message = "ExecutorManager has not been entered"
66+
raise RuntimeError(message)
6067

61-
async def _spawn_worker_thread(self) -> None:
62-
with ThreadPoolExecutor() as pool:
63-
await asyncio.get_running_loop().run_in_executor(
64-
pool,
65-
self._worker_loop,
66-
)
68+
if not self._processing_allowed:
69+
lib.Cronet_Runnable_Destroy(runnable)
70+
return
6771

68-
def _worker_loop(self) -> None:
69-
while (runnable := self._queue.get()) is not None:
70-
try:
71-
if self._processing_allowed:
72-
lib.Cronet_Runnable_Run(runnable)
73-
finally:
74-
lib.Cronet_Runnable_Destroy(runnable)
75-
76-
def enqueue_runnable(self, runnable: Runnable) -> None:
77-
self._queue.put_nowait(runnable)
72+
self._task_group.create_task(
73+
asyncio.to_thread(
74+
_process_runnable,
75+
runnable,
76+
),
77+
)
7878

7979
@property
8080
def executor(self) -> Executor:
8181
if self._executor is None:
8282
raise NotContextManagerError
8383

8484
return self._executor
85-
86-
def shutdown(self, *, process_pending: bool = True) -> None:
87-
self._processing_allowed = process_pending
88-
self._queue.put_nowait(None)

bookmarkmgr/bookmarkmgr/cronet/session.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@
4949
class Session:
5050
def __init__(self) -> None:
5151
self._engine: Engine | None = None
52+
self._executor_manager = ExecutorManager()
5253

5354
async def __aenter__(self) -> Self:
55+
await self._executor_manager.__aenter__()
5456
self._open()
5557

5658
return self
@@ -60,7 +62,10 @@ async def __aexit__(
6062
*args: Any, # noqa: PYI036
6163
**kwargs: Any,
6264
) -> None:
63-
self.close()
65+
try:
66+
await self._executor_manager.__aexit__(*args, **kwargs)
67+
finally:
68+
self._close()
6469

6570
def _dispose_engine(self) -> None:
6671
if self._engine is None:
@@ -87,7 +92,7 @@ def _open(self) -> None:
8792
self._dispose_engine()
8893
raise
8994

90-
def close(self) -> None:
95+
def _close(self) -> None:
9196
if self._engine is None:
9297
return
9398

@@ -144,7 +149,6 @@ async def request(
144149
**kwargs,
145150
),
146151
) as callback_manager,
147-
ExecutorManager() as executor_manager,
148152
):
149153
lib.Cronet_UrlRequestParams_http_method_set(
150154
parameters,
@@ -170,7 +174,7 @@ async def request(
170174
url.encode(),
171175
parameters,
172176
callback_manager.callback,
173-
executor_manager.executor,
177+
self._executor_manager.executor,
174178
),
175179
)
176180

@@ -317,8 +321,8 @@ async def _request(self, *args: Any, **kwargs: Any) -> Response:
317321
async with self._RateLimiterMixin_rate_limiter:
318322
return await super()._request(*args, **kwargs)
319323

320-
def close(self) -> None:
321-
super().close()
324+
def _close(self) -> None:
325+
super()._close()
322326

323327
self._RateLimiterMixin_rate_limiter.close()
324328

@@ -357,8 +361,8 @@ async def _request(
357361
):
358362
return await super()._request(method, url, *args, **kwargs)
359363

360-
def close(self) -> None:
361-
super().close()
364+
def _close(self) -> None:
365+
super()._close()
362366

363367
for rate_limiter in self.__rate_limiters.values():
364368
rate_limiter.close()

0 commit comments

Comments
 (0)