Skip to content

Commit 617b804

Browse files
committed
Pipeline signature: move source to first argument
1 parent e322658 commit 617b804

File tree

2 files changed

+52
-52
lines changed

2 files changed

+52
-52
lines changed

extraasync/pipeline.py

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@
2626
EOD = object()
2727
EXC_MARKER = object()
2828

29+
try:
30+
from functools import Placeholder # new in Python 3.14
31+
except ImportError:
32+
# Pickleable sentinel:
33+
34+
class Placeholder:
35+
pass
36+
2937

3038
class Heap:
3139
def __init__(self):
@@ -48,11 +56,12 @@ def __bool__(self):
4856
def __repr__(self):
4957
return f"<Heap {self.data!r}>"
5058

59+
5160
class AutoSet(MutableSet):
5261
"""Set with an associated asyncio.Queue
5362
54-
WHenever an item is removed/discarded, an item is fetched from the queue as
55-
a task factory - no argument callable: it is called and added to the set.
63+
Whenever an item is removed/discarded, an item is fetched from the queue as
64+
a task factory-no argument callable: it is called and added to the set.
5665
5766
5867
"""
@@ -169,6 +178,10 @@ def __call__(self, value):
169178
"just run the stage"
170179
return self.code(value)
171180

181+
def __repr__(self):
182+
return f"{self.__class__.__name__}{self.code}"
183+
184+
172185

173186
class Pipeline:
174187
"""
@@ -185,32 +198,32 @@ class Pipeline:
185198

186199
def __init__(
187200
self,
201+
source: t.Optional[t.AsyncIterable[T] | t.Iterable[T]],
188202
*stages: t.Sequence[t.Callable | Stage],
189-
data: t.Optional[t.AsyncIterable[T] | t.Iterable[T]],
190203
max_concurrency: t.Optional[int] = None,
191204
on_error: PipelineErrors = "strict",
192205
preserve_order: bool = False,
193206
max_simultaneous_records: t.Optional[int] = None,
194207
):
195-
"""
196-
Args:
197-
- stages: One async or sync callable which will process one data item at a time
198-
- TBD? accept generators as stages? (input data would be ".send"ed into it)
199-
- data: async or sync generator representing the data source
200-
- max_concurrency: Maximum number of concurrent tasks _for_ _each_ stage
201-
(i.e. if there are 2 stages, and max_concurrency is set to 4, we may have
202-
up to 8 concurrent tasks running at once in the pipeline, but each stage is
203-
limited to 4)
204-
- on_error: WHat to do if any stage raises an exeception - defaults to re-raise the
205-
exception and stop the whole pipeline
206-
- preserve_order: whether to yield the final results in the same order they were acquired from data.
207-
- max_simultaneous_records: limit on amount of records to hold across all stages and input in internal
208-
data structures: the idea is throtle data consumption in order to limit the
209-
amount of memory used by the Pipeline
208+
"""
209+
Args:
210+
- stages: One async or sync callable which will process one data item at a time
211+
- TBD? accept generators as stages? (input data would be ".send"ed into it)
212+
- data: async or sync generator representing the data source
213+
- max_concurrency: Maximum number of concurrent tasks _for_ _each_ stage
214+
(i.e. if there are 2 stages, and max_concurrency is set to 4, we may have
215+
up to 8 concurrent tasks running at once in the pipeline, but each stage is
216+
limited to 4)
217+
- on_error: WHat to do if any stage raises an exeception - defaults to re-raise the
218+
exception and stop the whole pipeline
219+
- preserve_order: whether to yield the final results in the same order they were acquired from data.
220+
- max_simultaneous_records: limit on amount of records to hold across all stages and input in internal
221+
data structures: the idea is throtle data consumption in order to limit the
222+
amount of memory used by the Pipeline
210223
211-
"""
224+
"""
212225
self.max_concurrency = max_concurrency
213-
self.data = _as_async_iterable(data)
226+
self.data = _as_async_iterable(source) if source not in (None, Placeholder) else None
214227
self.preserve_order = preserve_order
215228
# TBD: maybe allow limitting total memory usage instead of elements in the pipeline?
216229
self.max_simultaneous_records = max_simultaneous_records

tests/test_pipeline.py

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ async def map_function(n, map_interval=0):
2121
return n * 2
2222

2323
results = []
24-
async for result in Pipeline(map_function, data=producer(10)):
24+
async for result in Pipeline(producer(10), map_function):
2525
results.append(result)
2626

2727
assert results == list(range(0, 20, 2))
@@ -38,7 +38,7 @@ def map_function(n, map_interval=0):
3838
return n * 2
3939

4040
results = []
41-
async for result in Pipeline(map_function, data=producer(10)):
41+
async for result in Pipeline(producer(10), map_function):
4242
results.append(result)
4343

4444
assert results == list(range(0, 20, 2))
@@ -59,7 +59,7 @@ async def f2(v):
5959
return v + 1
6060

6161
results = []
62-
async for result in Pipeline(map_function, f2, data=producer(10)):
62+
async for result in Pipeline(producer(10), map_function, f2):
6363
results.append(result)
6464

6565
assert results == [n * 2 + 1 for n in range(10)]
@@ -88,7 +88,7 @@ async def map_function(n):
8888

8989
results = []
9090
async for result in Pipeline(
91-
map_function, data=producer(task_amount), max_concurrency=None
91+
producer(task_amount), map_function, max_concurrency=None
9292
):
9393
results.append(result)
9494

@@ -130,7 +130,7 @@ async def map_function(n):
130130

131131
results = []
132132
async for result in Pipeline(
133-
map_function, data=producer(task_amount), max_concurrency=max_concurrency
133+
producer(task_amount), map_function, max_concurrency=max_concurrency
134134
):
135135
results.append(result)
136136

@@ -216,7 +216,9 @@ async def stage(value):
216216
await asyncio.sleep(int(f"{value:02d}"[::-1]) / 200)
217217
return value
218218

219-
assert await Pipeline(stage, data=generator(100), preserve_order=True).results() == list(range(100))
219+
assert await Pipeline(generator(100), stage, preserve_order=True).results() == list(
220+
range(100)
221+
)
220222

221223

222224
@pytest.mark.asyncio
@@ -234,7 +236,9 @@ async def map_function(n):
234236
try:
235237
async with asyncio.timeout(0.1):
236238
try:
237-
async for result in Pipeline(map_function, data=producer(1), on_error="strict"):
239+
async for result in Pipeline(
240+
producer(1), map_function, on_error="strict"
241+
):
238242
results.append(result)
239243
except* Exception:
240244
# for this specific test, any other outcome is good.
@@ -276,51 +280,34 @@ async def map_function(n):
276280

277281
@pytest.mark.skip
278282
@pytest.mark.asyncio
279-
async def test_pipeline_reorder_results():
280-
...
283+
async def test_pipeline_reorder_results(): ...
284+
281285

282286
@pytest.mark.skip
283287
@pytest.mark.asyncio
284-
async def test_pipeline_add_stage_pipe_operator():
285-
286-
...
287-
288+
async def test_pipeline_add_stage_pipe_operator(): ...
288289

289290

290291
@pytest.mark.skip
291292
@pytest.mark.asyncio
292-
async def test_pipeline_add_data_and_execute_l_rhift_operator():
293-
294-
...
293+
async def test_pipeline_add_data_and_execute_l_rhift_operator(): ...
295294

296295

297296
@pytest.mark.skip
298297
@pytest.mark.asyncio
299-
async def test_pipeline_store_result_r_rshift_operator():
300-
...
298+
async def test_pipeline_store_result_r_rshift_operator(): ...
301299

302300

303301
@pytest.mark.skip
304302
@pytest.mark.asyncio
305-
async def test_pipeline_fine_tune_stages():
306-
...
303+
async def test_pipeline_fine_tune_stages(): ...
307304

308305

309306
@pytest.mark.skip
310307
@pytest.mark.asyncio
311-
async def test_pipeline_concurrency_rate_limit():
312-
...
308+
async def test_pipeline_concurrency_rate_limit(): ...
313309

314310

315311
@pytest.mark.skip
316312
@pytest.mark.asyncio
317-
async def test_pipeline_max_simultaneous_record_limit():
318-
...
319-
320-
321-
322-
323-
324-
325-
326-
313+
async def test_pipeline_max_simultaneous_record_limit(): ...

0 commit comments

Comments
 (0)