@@ -173,8 +173,8 @@ def __call__(self, value):
173
173
class Pipeline :
174
174
"""
175
175
Pipeline class
176
- Will enable mapping data from an iterator source to be passed down various stages
177
- of execution, where the result of each estage is fed to the next one
176
+ Will enable mapping data from an iterable source to be passed down various stages
177
+ of execution, where the result of each stage is fed to the next one
178
178
179
179
The difference for just calling one (or more) stages inline in a for function
180
180
that pipeline allows for fine grained concurrency specification and error handling
@@ -192,6 +192,23 @@ def __init__(
192
192
preserve_order : bool = False ,
193
193
max_simultaneous_records : t .Optional [int ] = None ,
194
194
):
195
+ """
196
+ Args:
197
+ - stages: One async or sync callable which will process one data item at a time
198
+ - TBD? accept generators as stages? (input data would be ".send"ed into it)
199
+ - data: async or sync generator representing the data source
200
+ - max_concurrency: Maximum number of concurrent tasks _for_ _each_ stage
201
+ (i.e. if there are 2 stages, and max_concurrency is set to 4, we may have
202
+ up to 8 concurrent tasks running at once in the pipeline, but each stage is
203
+ limited to 4)
204
+ - on_error: WHat to do if any stage raises an exeception - defaults to re-raise the
205
+ exception and stop the whole pipeline
206
+ - preserve_order: whether to yield the final results in the same order they were acquired from data.
207
+ - max_simultaneous_records: limit on amount of records to hold across all stages and input in internal
208
+ data structures: the idea is throtle data consumption in order to limit the
209
+ amount of memory used by the Pipeline
210
+
211
+ """
195
212
self .max_concurrency = max_concurrency
196
213
self .data = _as_async_iterable (data )
197
214
self .preserve_order = preserve_order
0 commit comments