Skip to content

Commit b029df5

Browse files
authored
Updated the to_interpreter module to use the public API on Python 3.14 (#956)
1 parent 01f02cf commit b029df5

File tree

3 files changed

+143
-126
lines changed

3 files changed

+143
-126
lines changed

docs/versionhistory.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
3434
Christoffer Fjord)
3535
- Added a documentation page explaining why one might want to use AnyIO's APIs instead of
3636
asyncio's
37+
- Updated the ``to_interpreters`` module to use the public ``concurrent.interpreters``
38+
API on Python 3.14 or later
3739
- Fixed ``anyio.Path.copy()`` and ``anyio.Path.copy_into()`` failing on Python 3.14.0a7
3840
- Fixed return annotation of ``__aexit__`` on async context managers. CMs which can
3941
suppress exceptions should return ``bool``, or ``None`` otherwise.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ classifiers = [
2323
"Programming Language :: Python :: 3.11",
2424
"Programming Language :: Python :: 3.12",
2525
"Programming Language :: Python :: 3.13",
26+
"Programming Language :: Python :: 3.14",
2627
]
2728
requires-python = ">= 3.9"
2829
dependencies = [

src/anyio/to_interpreter.py

Lines changed: 140 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22

33
import atexit
44
import os
5-
import pickle
65
import sys
76
from collections import deque
87
from collections.abc import Callable
9-
from textwrap import dedent
108
from typing import Any, Final, TypeVar
119

1210
from . import current_time, to_thread
@@ -19,127 +17,144 @@
1917
else:
2018
from typing_extensions import TypeVarTuple, Unpack
2119

22-
UNBOUND: Final = 2 # I have no clue how this works, but it was used in the stdlib
23-
FMT_UNPICKLED: Final = 0
24-
FMT_PICKLED: Final = 1
25-
DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value
26-
MAX_WORKER_IDLE_TIME = (
27-
30 # seconds a subinterpreter can be idle before becoming eligible for pruning
28-
)
29-
QUEUE_PICKLE_ARGS: Final = (
30-
(UNBOUND,) if sys.version_info >= (3, 14, 0, "beta", 2) else (FMT_PICKLED, UNBOUND)
31-
)
32-
QUEUE_UNPICKLE_ARGS: Final = (
33-
(UNBOUND,)
34-
if sys.version_info >= (3, 14, 0, "beta", 2)
35-
else (FMT_UNPICKLED, UNBOUND)
36-
)
20+
if sys.version_info >= (3, 14):
21+
from concurrent.interpreters import ExecutionFailed, create
3722

38-
T_Retval = TypeVar("T_Retval")
39-
PosArgsT = TypeVarTuple("PosArgsT")
40-
41-
_idle_workers = RunVar[deque["Worker"]]("_available_workers")
42-
_default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter")
43-
44-
45-
class Worker:
46-
_run_func = compile(
47-
dedent("""
48-
import _interpqueues as queues
49-
import _interpreters as interpreters
50-
from pickle import loads, dumps, HIGHEST_PROTOCOL
51-
52-
item = queues.get(queue_id)[0]
23+
def _interp_call(func: Callable[..., Any], args: tuple[Any, ...]):
5324
try:
54-
func, args = loads(item)
5525
retval = func(*args)
5626
except BaseException as exc:
57-
is_exception = True
58-
retval = exc
27+
return exc, True
5928
else:
60-
is_exception = False
29+
return retval, False
6130

62-
try:
63-
queues.put(queue_id, (retval, is_exception), *QUEUE_UNPICKLE_ARGS)
64-
except interpreters.NotShareableError:
65-
retval = dumps(retval, HIGHEST_PROTOCOL)
66-
queues.put(queue_id, (retval, is_exception), *QUEUE_PICKLE_ARGS)
67-
"""),
31+
class Worker:
32+
last_used: float = 0
33+
34+
def __init__(self) -> None:
35+
self._interpreter = create()
36+
37+
def destroy(self) -> None:
38+
self._interpreter.close()
39+
40+
def call(
41+
self,
42+
func: Callable[..., T_Retval],
43+
args: tuple[Any, ...],
44+
) -> T_Retval:
45+
try:
46+
res, is_exception = self._interpreter.call(_interp_call, func, args)
47+
except ExecutionFailed as exc:
48+
raise BrokenWorkerInterpreter(exc.excinfo) from exc
49+
50+
if is_exception:
51+
raise res
52+
53+
return res
54+
elif sys.version_info >= (3, 13):
55+
import _interpqueues
56+
import _interpreters
57+
58+
UNBOUND: Final = 2 # I have no clue how this works, but it was used in the stdlib
59+
FMT_UNPICKLED: Final = 0
60+
FMT_PICKLED: Final = 1
61+
QUEUE_PICKLE_ARGS: Final = (FMT_PICKLED, UNBOUND)
62+
QUEUE_UNPICKLE_ARGS: Final = (FMT_UNPICKLED, UNBOUND)
63+
64+
_run_func = compile(
65+
"""
66+
import _interpqueues
67+
from _interpreters import NotShareableError
68+
from pickle import loads, dumps, HIGHEST_PROTOCOL
69+
70+
QUEUE_PICKLE_ARGS = (1, 2)
71+
QUEUE_UNPICKLE_ARGS = (0, 2)
72+
73+
item = _interpqueues.get(queue_id)[0]
74+
try:
75+
func, args = loads(item)
76+
retval = func(*args)
77+
except BaseException as exc:
78+
is_exception = True
79+
retval = exc
80+
else:
81+
is_exception = False
82+
83+
try:
84+
_interpqueues.put(queue_id, (retval, is_exception), *QUEUE_UNPICKLE_ARGS)
85+
except NotShareableError:
86+
retval = dumps(retval, HIGHEST_PROTOCOL)
87+
_interpqueues.put(queue_id, (retval, is_exception), *QUEUE_PICKLE_ARGS)
88+
""",
6889
"<string>",
6990
"exec",
7091
)
7192

72-
last_used: float = 0
73-
74-
_initialized: bool = False
75-
_interpreter_id: int
76-
_queue_id: int
77-
78-
def initialize(self) -> None:
79-
import _interpqueues as queues
80-
import _interpreters as interpreters
81-
82-
self._interpreter_id = interpreters.create()
83-
self._queue_id = queues.create(2, *QUEUE_UNPICKLE_ARGS)
84-
self._initialized = True
85-
interpreters.set___main___attrs(
86-
self._interpreter_id,
87-
{
88-
"queue_id": self._queue_id,
89-
"QUEUE_PICKLE_ARGS": QUEUE_PICKLE_ARGS,
90-
"QUEUE_UNPICKLE_ARGS": QUEUE_UNPICKLE_ARGS,
91-
},
92-
)
93+
class Worker:
94+
last_used: float = 0
95+
96+
def __init__(self) -> None:
97+
self._interpreter_id = _interpreters.create()
98+
self._queue_id = _interpqueues.create(1, *QUEUE_UNPICKLE_ARGS)
99+
_interpreters.set___main___attrs(
100+
self._interpreter_id, {"queue_id": self._queue_id}
101+
)
102+
103+
def destroy(self) -> None:
104+
_interpqueues.destroy(self._queue_id)
105+
_interpreters.destroy(self._interpreter_id)
106+
107+
def call(
108+
self,
109+
func: Callable[..., T_Retval],
110+
args: tuple[Any, ...],
111+
) -> T_Retval:
112+
import pickle
113+
114+
item = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL)
115+
_interpqueues.put(self._queue_id, item, *QUEUE_PICKLE_ARGS)
116+
exc_info = _interpreters.exec(self._interpreter_id, _run_func)
117+
if exc_info:
118+
raise BrokenWorkerInterpreter(exc_info)
119+
120+
res = _interpqueues.get(self._queue_id)
121+
(res, is_exception), fmt = res[:2]
122+
if fmt == FMT_PICKLED:
123+
res = pickle.loads(res)
124+
125+
if is_exception:
126+
raise res
127+
128+
return res
129+
else:
93130

94-
def destroy(self) -> None:
95-
import _interpqueues as queues
96-
import _interpreters as interpreters
97-
98-
if self._initialized:
99-
interpreters.destroy(self._interpreter_id)
100-
queues.destroy(self._queue_id)
101-
102-
def _call(
103-
self,
104-
func: Callable[..., T_Retval],
105-
args: tuple[Any],
106-
) -> tuple[Any, bool]:
107-
import _interpqueues as queues
108-
import _interpreters as interpreters
109-
110-
if not self._initialized:
111-
self.initialize()
112-
113-
payload = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL)
114-
queues.put(self._queue_id, payload, *QUEUE_PICKLE_ARGS)
115-
116-
res: Any
117-
is_exception: bool
118-
if exc_info := interpreters.exec(self._interpreter_id, self._run_func):
119-
raise BrokenWorkerInterpreter(exc_info)
120-
121-
(res, is_exception), fmt = queues.get(self._queue_id)[:2]
122-
if fmt == FMT_PICKLED:
123-
res = pickle.loads(res)
124-
125-
return res, is_exception
126-
127-
async def call(
128-
self,
129-
func: Callable[..., T_Retval],
130-
args: tuple[Any],
131-
limiter: CapacityLimiter,
132-
) -> T_Retval:
133-
result, is_exception = await to_thread.run_sync(
134-
self._call,
135-
func,
136-
args,
137-
limiter=limiter,
138-
)
139-
if is_exception:
140-
raise result
131+
class Worker:
132+
last_used: float = 0
133+
134+
def __init__(self) -> None:
135+
raise RuntimeError("subinterpreters require at least Python 3.13")
136+
137+
def call(
138+
self,
139+
func: Callable[..., T_Retval],
140+
args: tuple[Any, ...],
141+
) -> T_Retval:
142+
raise NotImplementedError
143+
144+
def destroy(self) -> None:
145+
pass
146+
147+
148+
DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value
149+
MAX_WORKER_IDLE_TIME = (
150+
30 # seconds a subinterpreter can be idle before becoming eligible for pruning
151+
)
152+
153+
T_Retval = TypeVar("T_Retval")
154+
PosArgsT = TypeVarTuple("PosArgsT")
141155

142-
return result
156+
_idle_workers = RunVar[deque[Worker]]("_available_workers")
157+
_default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter")
143158

144159

145160
def _stop_workers(workers: deque[Worker]) -> None:
@@ -157,25 +172,19 @@ async def run_sync(
157172
"""
158173
Call the given function with the given arguments in a subinterpreter.
159174
160-
If the ``cancellable`` option is enabled and the task waiting for its completion is
161-
cancelled, the call will still run its course but its return value (or any raised
162-
exception) will be ignored.
163-
164-
.. warning:: This feature is **experimental**. The upstream interpreter API has not
165-
yet been finalized or thoroughly tested, so don't rely on this for anything
166-
mission critical.
175+
.. warning:: On Python 3.13, the :mod:`concurrent.interpreters` module was not yet
176+
available, so the code path for that Python version relies on an undocumented,
177+
private API. As such, it is recommended to not rely on this function for anything
178+
mission-critical on Python 3.13.
167179
168180
:param func: a callable
169-
:param args: positional arguments for the callable
170-
:param limiter: capacity limiter to use to limit the total amount of subinterpreters
181+
:param args: the positional arguments for the callable
182+
:param limiter: capacity limiter to use to limit the total number of subinterpreters
171183
running (if omitted, the default limiter is used)
172184
:return: the result of the call
173185
:raises BrokenWorkerInterpreter: if there's an internal error in a subinterpreter
174186
175187
"""
176-
if sys.version_info <= (3, 13):
177-
raise RuntimeError("subinterpreters require at least Python 3.13")
178-
179188
if limiter is None:
180189
limiter = current_default_interpreter_limiter()
181190

@@ -193,7 +202,12 @@ async def run_sync(
193202
worker = Worker()
194203

195204
try:
196-
return await worker.call(func, args, limiter)
205+
return await to_thread.run_sync(
206+
worker.call,
207+
func,
208+
args,
209+
limiter=limiter,
210+
)
197211
finally:
198212
# Prune workers that have been idle for too long
199213
now = current_time()
@@ -209,8 +223,8 @@ async def run_sync(
209223

210224
def current_default_interpreter_limiter() -> CapacityLimiter:
211225
"""
212-
Return the capacity limiter that is used by default to limit the number of
213-
concurrently running subinterpreters.
226+
Return the capacity limiter used by default to limit the number of concurrently
227+
running subinterpreters.
214228
215229
Defaults to the number of CPU cores.
216230

0 commit comments

Comments
 (0)