Skip to content

Commit d0d9433

Browse files
committed
Utility to check memory usage of an object graph
1 parent e9973c4 commit d0d9433

File tree

6 files changed

+140
-10
lines changed

6 files changed

+140
-10
lines changed

README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,34 @@ Write out each step as a simple function (even a lambda), sync or async as
198198
is simpler, setup the Pipeline, and make a single call to have
199199
everything done.
200200

201+
```python
202+
203+
from extraasync import Pipeline
204+
import httpx
205+
206+
urls = ["https://example.com/data1", "https://example.com/data2", ...] # (X 100s)
207+
208+
async def fetch(url):
209+
client = httpx.AsyncClient()
210+
return (await client.get(url)).text
211+
212+
async def extract(html):
213+
return re.findall(r"\<h1\>(.+)?\<\/h1\>", html)
214+
215+
async def record(msg):
216+
with open("output.txt", "at") as f:
217+
f.write(msg + "\n")
218+
201219

220+
async def main():
221+
await Pipeline(urls, fetch, extract, record, max_concurrency=30, rate_limit=10)
222+
223+
224+
# Or, the fancy style:
225+
226+
from extraasync.pipeline import process
227+
228+
async def fancy_main():
229+
await (urls >> Pipeline(rate_limit=10) | fetch | extract | record)
230+
231+
```

extraasync/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@
77
__version__ = "0.3.0"
88

99

10-
__all__ = ["aenumerate", "ExtraTaskGroup", "sync_to_async", "async_to_sync", "at_loop_stop_callback", "Pipeline", "remove_loop_stop_callback", "RateLimiter", "__version__"]
10+
__all__ = ["aenumerate", "ExtraTaskGroup", "sync_to_async", "async_to_sync",
11+
"at_loop_stop_callback", "Pipeline", "remove_loop_stop_callback", "RateLimiter", "__version__"]

extraasync/aenumerate.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
class aenumerate:
2+
"""
3+
Asynchronous version of enumerate:
4+
5+
can be used in async-for loops
6+
"""
7+
28
def __init__(self, iterable, start=None):
39
self.iterable = iterable
410
self.start = start or 0
@@ -19,4 +25,3 @@ async def __aiter__(self):
1925

2026
def __repr__(self):
2127
return f"Asynchronous enumeration for {self.iterable!r} at index {self.index!r}"
22-

extraasync/pipeline.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ class AutoSet(MutableSet):
7676
Whenever an item is removed/discarded, an item is fetched from the queue as
7777
a task factory-no argument callable: it is called and added to the set.
7878
79-
8079
"""
8180

8281
def __init__(self, initial=()):
@@ -189,7 +188,8 @@ def normalized(self):
189188
value /= 24 * 3600
190189
case _:
191190
raise ValueError(
192-
f"Invalid time unit for frequency throtle - should be one of {TIME_UNIT}"
191+
f"Invalid time unit for frequency throtle - should be one of {
192+
TIME_UNIT}"
193193
)
194194
return 1 / value
195195

@@ -282,7 +282,8 @@ def put(self, value: tuple[int, t.Any]):
282282
if self.max_concurrency in (None, 0) or len(self.tasks) < self.max_concurrency:
283283
self.tasks.add(self._create_task(value))
284284
else:
285-
self.tasks.queue.put_nowait(lambda value=value: self._create_task(value))
285+
self.tasks.queue.put_nowait(
286+
lambda value=value: self._create_task(value))
286287

287288
def __call__(self, value):
288289
"just run the stage"
@@ -342,7 +343,8 @@ def __init__(
342343
"""
343344
self.max_concurrency = max_concurrency
344345
self.data = (
345-
_as_async_iterable(source) if source not in (None, Placeholder) else None
346+
_as_async_iterable(source) if source not in (
347+
None, Placeholder) else None
346348
)
347349
self.preserve_order = preserve_order
348350
# TBD: maybe allow limitting total memory usage instead of elements in the pipeline?
@@ -380,7 +382,8 @@ def _create_stages(self, stages):
380382

381383
def reset(self):
382384
self.ordered_results = Heap()
383-
self.output: asyncio.Queue[tuple[int | EXC_MARKER], t.Any] = asyncio.Queue()
385+
self.output: asyncio.Queue[tuple[int |
386+
EXC_MARKER], t.Any] = asyncio.Queue()
384387
self._create_stages(self.raw_stages)
385388
self.count = 0
386389

@@ -435,7 +438,8 @@ async def __aiter__(self):
435438
elif self.on_error == "strict":
436439
raise result_data[1]
437440
elif self.on_error == "lazy":
438-
raise NotImplementedError("Lazy error raising in pipeline")
441+
raise NotImplementedError(
442+
"Lazy error raising in pipeline")
439443
if not self.preserve_order:
440444
yield result_data
441445
else:

extraasync/utils.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import asyncio
2+
import sys
3+
from array import array
4+
from collections.abc import Callable
5+
from types import FrameType, ModuleType, CodeType, CoroutineType
6+
from datetime import datetime, date, timedelta
7+
8+
9+
FrameLocalsProxy = (lambda: type(sys._getframe().f_locals))()
10+
11+
DontRecurseTypes = float, int, complex, bool, datetime, date, timedelta, str, bytes, bytearray, array, FrameType
12+
ZeroCheckTypes = ModuleType, Callable, type, CodeType
13+
InspectAllAttrs = asyncio.Future, CoroutineType
14+
15+
16+
def sizeof(obj, seen=None):
17+
"""Recursively finds the aproximate size of a Python object graph,
18+
including all references to non-class objects, and data which might be in
19+
co-routine closures.
20+
21+
This is not perfect.
22+
Nothing of this type will ever be perfect - recursivelly trying to find
23+
all attributes referenced by an object will usually get out of hand.
24+
Maybe use the GC to try to filter objects referenced only in the requested graph?
25+
26+
Nonetheless this implementation will include the size of objects currently held
27+
in local variables inside asyncio tasks. For the typical workload here, it will sun up
28+
the sizes of eventual API or HTTP request responses that are in temporary
29+
use in pipeline modules
30+
"""
31+
if seen is None:
32+
seen = set()
33+
try:
34+
loop = asyncio.get_running_loop()
35+
seen.add(id(loop))
36+
except RuntimeError:
37+
pass
38+
if id(obj) in seen:
39+
return 0
40+
size = sys.getsizeof(obj)
41+
seen.add(id(obj))
42+
if isinstance(obj, ZeroCheckTypes):
43+
# we just want isntance data
44+
return 0
45+
46+
if isinstance(obj, FrameType):
47+
if obj.f_locals is not obj.f_globals:
48+
size += sizeof(obj.f_locals, None)
49+
50+
if isinstance(obj, DontRecurseTypes):
51+
return size
52+
53+
if isinstance(obj, dict):
54+
# for userdeifned Sequences, Mappings, etc...we get the contents size by attribute introspection.
55+
for key, value in obj.items():
56+
size += sizeof(key, seen)
57+
size += sizeof(value, seen)
58+
elif isinstance(obj, FrameLocalsProxy):
59+
for value in obj.values():
60+
size += sizeof(value, seen)
61+
elif hasattr(type(obj), "__len__") and not isinstance(obj, range):
62+
# take sequences. Skip generators. If seuqnece items are generated, we incorrectly add their sizes.
63+
for item in obj:
64+
size += sizeof(item, seen)
65+
instance_checked = False
66+
if hasattr(obj, "__dict__"):
67+
size += sizeof(obj.__dict__, seen)
68+
instance_checked = True
69+
if hasattr(obj, "__slots__"):
70+
for slot in obj.__slots__:
71+
size += sizeof(getattr(obj, slot), seen)
72+
instance_checked = True
73+
if (frame := getattr(obj, "cr_frame", None)) or (frame := getattr(obj, "gi_frame", None)):
74+
if isinstance(frame, FrameType):
75+
size += sizeof(frame.f_locals, seen)
76+
instance_checked = True
77+
if not instance_checked and isinstance(obj, InspectAllAttrs):
78+
# Certain objects created in native code, such as asyncio.Tasks won't have neither a __dict__ nor __slots__
79+
# nor will "vars()" work on them.
80+
# DOUBT: maybe use an allow-list here, instead of introspecting everything?
81+
cls = type(obj)
82+
for attrname in dir(obj):
83+
attr = getattr(obj, attrname)
84+
clsattr = getattr(cls, attrname, None)
85+
if attr is clsattr or callable(attr):
86+
# skip methods and class attributes
87+
continue
88+
size += sizeof(attr, seen)
89+
90+
return size

tests/test_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,12 +544,12 @@ async def test_pipeline_add_data_and_execute_l_rhift_operator(): ...
544544

545545
@pytest.mark.skip
546546
@pytest.mark.asyncio
547-
async def test_pipeline_store_result_r_rshift_operator(): ...
547+
async def test_pipeline_fine_tune_stages(): ...
548548

549549

550550
@pytest.mark.skip
551551
@pytest.mark.asyncio
552-
async def test_pipeline_fine_tune_stages(): ...
552+
async def test_pipeline_retry_simple(): ...
553553

554554

555555
@pytest.mark.asyncio

0 commit comments

Comments
 (0)