Skip to content

Commit e9cc89d

Browse files
Merge pull request #512 from Neuraxio/add-windows-compatibility
Add Windows support + do some cleaning
2 parents 1148253 + bf0f14d commit e9cc89d

File tree

11 files changed

+369
-360
lines changed

11 files changed

+369
-360
lines changed

examples/parallel/plot_streaming_pipeline.py

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,27 @@
33
===================================================================
44
55
This demonstrates how to stream data in parallel in a Neuraxle pipeline.
6+
The pipeline steps' parallelism here will be obvious.
7+
8+
The pipeline has two steps:
9+
1. Preprocessing: the step that process the data simply sleeps.
10+
2. Model: the model simply multiplies the data by two.
11+
12+
This can be used with scikit-learn as well to transform things in parallel,
13+
and any other library such as tensorflow.
14+
15+
Pipelines benchmarked:
16+
1. We first use a classical pipeline and evaluate the time.
17+
2. Then we use a minibatched pipeline and we evaluate the time.
18+
3. Then we use a parallel pipeline and we evaluate the time.
19+
20+
We expect the parallel pipeline to be faster due to having more workers
21+
in parallel, as well as starting the model's transformations at the same
22+
time that other batches are being preprocessed, using queues.
23+
624
725
..
8-
Copyright 2019, Neuraxio Inc.
26+
Copyright 2021, Neuraxio Inc.
927
1028
Licensed under the Apache License, Version 2.0 (the "License");
1129
you may not use this file except in compliance with the License.
@@ -25,47 +43,49 @@
2543
import numpy as np
2644

2745
from neuraxle.distributed.streaming import SequentialQueuedPipeline
28-
from neuraxle.pipeline import Pipeline
46+
from neuraxle.pipeline import BasePipeline, Pipeline, MiniBatchSequentialPipeline
2947
from neuraxle.steps.loop import ForEach
3048
from neuraxle.steps.misc import Sleep
3149
from neuraxle.steps.numpy import MultiplyByN
3250

3351

34-
def main():
35-
"""
36-
Process tasks of batch size 10 with 8 queued workers that have a max queue size of 10.
37-
Each task doest the following: For each data input, sleep 0.02 seconds, and multiply by 2.
38-
"""
39-
sleep_time = 0.02
40-
p = SequentialQueuedPipeline([
41-
Pipeline([ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]),
42-
], n_workers_per_step=8, max_queue_size=10, batch_size=10)
43-
52+
def eval_run_time(pipeline: BasePipeline):
53+
pipeline.setup()
4454
a = time.time()
45-
outputs_streaming = p.transform(list(range(100)))
55+
output = pipeline.transform(list(range(100)))
4656
b = time.time()
47-
time_queued_pipeline = b - a
48-
print('SequentialQueuedPipeline')
49-
print('execution time: {} seconds'.format(time_queued_pipeline))
57+
seconds = b - a
58+
return seconds, output
59+
5060

61+
def main():
5162
"""
52-
Process data inputs sequentially.
53-
For each data input, sleep 0.02 seconds, and then multiply by 2.
63+
The task is to sleep 0.02 seconds for each data input and then multiply by 2.
5464
"""
55-
p = Pipeline([
56-
Pipeline([ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]),
57-
])
58-
59-
a = time.time()
60-
outputs_vanilla = p.transform(list(range(100)))
61-
b = time.time()
62-
time_vanilla_pipeline = b - a
63-
64-
print('VanillaPipeline')
65-
print('execution time: {} seconds'.format(time_vanilla_pipeline))
66-
67-
assert time_queued_pipeline < time_vanilla_pipeline
68-
assert np.array_equal(outputs_streaming, outputs_vanilla)
65+
sleep_time = 0.02
66+
preprocessing_and_model_steps = [ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]
67+
68+
# Classical pipeline - all at once with one big batch:
69+
p = Pipeline(preprocessing_and_model_steps)
70+
time_vanilla_pipeline, output_classical = eval_run_time(p)
71+
print(f"Classical 'Pipeline' execution time: {time_vanilla_pipeline} seconds.")
72+
73+
# Classical minibatch pipeline - minibatch size 10:
74+
p = MiniBatchSequentialPipeline(preprocessing_and_model_steps,
75+
batch_size=10)
76+
time_minibatch_pipeline, output_minibatch = eval_run_time(p)
77+
print(f"Minibatched 'MiniBatchSequentialPipeline' execution time: {time_minibatch_pipeline} seconds.")
78+
79+
# Parallel pipeline - minibatch size 10 with 8 workers per step that
80+
# have a max queue size of 5 batches between preprocessing and the model:
81+
p = SequentialQueuedPipeline(preprocessing_and_model_steps,
82+
n_workers_per_step=8, max_queue_size=5, batch_size=10)
83+
time_parallel_pipeline, output_parallel = eval_run_time(p)
84+
print(f"Parallel 'SequentialQueuedPipeline' execution time: {time_parallel_pipeline} seconds.")
85+
86+
assert time_parallel_pipeline < time_minibatch_pipeline, str((time_parallel_pipeline, time_vanilla_pipeline))
87+
assert np.array_equal(output_classical, output_minibatch)
88+
assert np.array_equal(output_classical, output_parallel)
6989

7090

7191
if __name__ == '__main__':

neuraxle/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2242,7 +2242,7 @@ def _teardown(self) -> 'BaseTransformer':
22422242
:return: self
22432243
"""
22442244
self.is_initialized = False
2245-
return self
2245+
return RecursiveDict()
22462246

22472247
def __del__(self):
22482248
try:

neuraxle/distributed/streaming.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class ObservableQueueMixin(MixinForBaseTransformer):
5959
:class:`SequentialQueuedPipeline`
6060
"""
6161

62-
def __init__(self, queue):
62+
def __init__(self, queue: Queue):
6363
MixinForBaseTransformer.__init__(self)
6464
self.queue = queue
6565
self.observers = []
@@ -176,7 +176,7 @@ def __init__(
176176
additional_worker_arguments = [[] for _ in range(n_workers)]
177177

178178
MetaStep.__init__(self, wrapped)
179-
ObservableQueueMixin.__init__(self, Queue(maxsize=max_queue_size))
179+
ObservableQueueMixin.__init__(self, Queue(maxsize=max_queue_size)) # max_queue_size is in batches
180180

181181
self.use_processes: bool = use_processes
182182
self.workers: List[Process] = []
@@ -345,17 +345,16 @@ class BaseQueuedPipeline(MiniBatchSequentialPipeline):
345345
], batch_size=10, max_queue_size=5)
346346
outputs = p.transform(list(range(100)))
347347
348-
:param steps: pipeline steps
349-
:param batch_size: number of elements to combine into a single batch
350-
:param n_workers_per_step: number of workers to spawn per step
351-
:param max_queue_size: max number of elements inside the processing queue
352-
:param data_joiner: transformer step to join streamed batches together at the end of the pipeline
348+
:param steps: pipeline steps.
349+
:param batch_size: number of elements to combine into a single batch.
350+
:param n_workers_per_step: number of workers to spawn per step.
351+
:param max_queue_size: max number of batches inside the processing queue between the workers.
352+
:param data_joiner: transformer step to join streamed batches together at the end of the pipeline.
353353
:param use_processes: use processes instead of threads for parallel processing. multiprocessing.context.Process is used by default.
354354
:param use_savers: use savers to serialize steps for parallel processing. Recommended if using processes instead of threads.
355-
:param keep_incomplete_batch: (Optional.) A bool representing
356-
whether the last batch should be dropped in the case it has fewer than
357-
`batch_size` elements; the default behavior is to keep the smaller
358-
batch.
355+
:param keep_incomplete_batch: (Optional.) A bool that indicates whether
356+
or not the last batch should be dropped in the case it has fewer than
357+
`batch_size` elements; the default behavior is to keep the smaller batch.
359358
:param default_value_data_inputs: expected_outputs default fill value
360359
for padding and values outside iteration range, or :class:`~neuraxle.data_container.DataContainer.AbsentValuesNullObject`
361360
to trim absent values from the batch

neuraxle/metaopt/auto_ml.py

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@
4141

4242
import numpy as np
4343

44-
from neuraxle.base import BaseStep, ExecutionContext, ForceHandleMixin, ExecutionPhase, _HasChildrenMixin, \
45-
LOGGER_FORMAT, DATE_FORMAT
44+
from neuraxle.base import BaseStep, ExecutionContext, ForceHandleMixin, ExecutionPhase, _HasChildrenMixin
4645
from neuraxle.data_container import DataContainer
4746
from neuraxle.hyperparams.space import HyperparameterSamples, HyperparameterSpace
4847
from neuraxle.metaopt.callbacks import BaseCallback, CallbackList, ScoringCallback
@@ -54,6 +53,7 @@
5453
class HyperparamsRepository(_Observable[Tuple['HyperparamsRepository', Trial]], ABC):
5554
"""
5655
Hyperparams repository that saves hyperparams, and scores for every AutoML trial.
56+
Cache folder can be changed to do different round numbers.
5757
5858
.. seealso::
5959
:class:`AutoML`,
@@ -66,10 +66,15 @@ class HyperparamsRepository(_Observable[Tuple['HyperparamsRepository', Trial]],
6666
:class:`~neuraxle.hyperparams.space.HyperparameterSamples`
6767
"""
6868

69-
def __init__(self, hyperparameter_selection_strategy=None, cache_folder=None, best_retrained_model_folder=None):
69+
def __init__(
70+
self,
71+
hyperparameter_selection_strategy: 'BaseHyperparameterSelectionStrategy' = None,
72+
cache_folder: str = None,
73+
best_retrained_model_folder: str = None,
74+
):
7075
super().__init__()
7176
if cache_folder is None:
72-
cache_folder = 'trials'
77+
cache_folder = os.path.join(f'{self.__class__.__name__}', 'trials')
7378
if best_retrained_model_folder is None:
7479
best_retrained_model_folder = os.path.join(cache_folder, 'best')
7580
self.best_retrained_model_folder = best_retrained_model_folder
@@ -155,7 +160,7 @@ def save_best_model(self, step: BaseStep):
155160
self._save_best_model(step, trial_hash)
156161
return step
157162

158-
def new_trial(self, auto_ml_container: 'AutoMLContainer'):
163+
def new_trial(self, auto_ml_container: 'AutoMLContainer') -> Trial:
159164
"""
160165
Create a new trial with the best next hyperparams.
161166
@@ -164,19 +169,16 @@ def new_trial(self, auto_ml_container: 'AutoMLContainer'):
164169
:return: trial
165170
"""
166171
hyperparams = self.hyperparameter_selection_strategy.find_next_best_hyperparams(auto_ml_container)
167-
logger = self._create_logger_for_trial(auto_ml_container.trial_number)
168-
logger.info('\nnew trial: {}'.format(json.dumps(hyperparams.to_nested_dict(), sort_keys=True, indent=4)))
169172

170173
trial = Trial(
174+
trial_number=auto_ml_container.trial_number,
171175
hyperparams=hyperparams,
172176
save_trial_function=self.save_trial,
173-
logger=logger,
174177
cache_folder=self.cache_folder,
175178
main_metric_name=auto_ml_container.main_scoring_metric_name
176179
)
177180
return trial
178181

179-
180182
def _get_trial_hash(self, hp_dict):
181183
"""
182184
Hash hyperparams with md5 to create a trial hash.
@@ -187,19 +189,6 @@ def _get_trial_hash(self, hp_dict):
187189
current_hyperparameters_hash = hashlib.md5(str.encode(str(hp_dict))).hexdigest()
188190
return current_hyperparameters_hash
189191

190-
def _create_logger_for_trial(self, trial_number) -> logging.Logger:
191-
192-
os.makedirs(self.cache_folder, exist_ok=True)
193-
194-
logfile_path = os.path.join(self.cache_folder, f"trial_{trial_number}.log")
195-
logger_name = f"trial_{trial_number}"
196-
logger = logging.getLogger(logger_name)
197-
formatter = logging.Formatter(fmt=LOGGER_FORMAT, datefmt=DATE_FORMAT)
198-
file_handler = logging.FileHandler(filename=logfile_path)
199-
file_handler.setFormatter(formatter)
200-
logger.addHandler(file_handler)
201-
return logger
202-
203192

204193
class InMemoryHyperparamsRepository(HyperparamsRepository):
205194
"""
@@ -329,14 +318,14 @@ def _save_trial(self, trial: 'Trial'):
329318
# Sleeping to have a valid time difference between files when reloading them to sort them by creation time:
330319
time.sleep(0.1)
331320

332-
def new_trial(self, auto_ml_container: 'AutoMLContainer'):
321+
def new_trial(self, auto_ml_container: 'AutoMLContainer') -> Trial:
333322
"""
334323
Create new hyperperams trial json file.
335324
336325
:param auto_ml_container: auto ml container
337326
:return:
338327
"""
339-
trial = HyperparamsRepository.new_trial(self, auto_ml_container)
328+
trial: Trial = HyperparamsRepository.new_trial(self, auto_ml_container)
340329
self._save_trial(trial)
341330

342331
return trial
@@ -346,6 +335,7 @@ def load_all_trials(self, status: 'TRIAL_STATUS' = None) -> 'Trials':
346335
Load all hyperparameter trials with their corresponding score.
347336
Reads all the saved trial json files, sorted by creation date.
348337
338+
:param status: (optional) filter to select only trials with this status.
349339
:return: (hyperparams, scores)
350340
"""
351341
trials = Trials()
@@ -370,7 +360,8 @@ def getmtimens(filename):
370360
if status is None or trial_json['status'] == status.value:
371361
trials.append(Trial.from_json(
372362
update_trial_function=self.save_trial,
373-
trial_json=trial_json
363+
trial_json=trial_json,
364+
cache_folder=self.cache_folder
374365
))
375366

376367
return trials
@@ -498,7 +489,14 @@ def __init__(
498489
hyperparams_repository = InMemoryHyperparamsRepository()
499490
self.hyperparams_repository: HyperparamsRepository = hyperparams_repository
500491

501-
def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context: ExecutionContext = None) -> Trial:
492+
def train(
493+
self,
494+
pipeline: BaseStep,
495+
data_inputs,
496+
expected_outputs=None,
497+
context: ExecutionContext = None,
498+
trial_number=0
499+
) -> Trial:
502500
"""
503501
Train pipeline using the validation splitter.
504502
Track training, and validation metrics for each epoch.
@@ -523,12 +521,12 @@ def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context:
523521
logger=context.logger,
524522
hyperparams=pipeline.get_hyperparams(),
525523
main_metric_name=self.get_main_metric_name(),
526-
save_trial_function=self.hyperparams_repository.save_trial
524+
save_trial_function=self.hyperparams_repository.save_trial,
525+
trial_number=trial_number
527526
)
528527

529528
self.execute_trial(
530529
pipeline=pipeline,
531-
trial_number=1,
532530
repo_trial=repo_trial,
533531
context=context,
534532
validation_splits=validation_splits,
@@ -541,7 +539,6 @@ def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context:
541539
def execute_trial(
542540
self,
543541
pipeline: BaseStep,
544-
trial_number: int,
545542
repo_trial: Trial,
546543
context: ExecutionContext,
547544
validation_splits: List[Tuple[DataContainer, DataContainer]],
@@ -576,7 +573,7 @@ def execute_trial(
576573
repo_trial=repo_trial,
577574
repo_trial_split_number=repo_trial_split.split_number,
578575
validation_splits=validation_splits,
579-
trial_number=trial_number,
576+
trial_number=repo_trial.trial_number,
580577
n_trial=n_trial
581578
)
582579

@@ -867,7 +864,6 @@ def _attempt_trial(self, trial_number, validation_splits, context: ExecutionCont
867864
repo_trial_split = self.trainer.execute_trial(
868865
pipeline=self.pipeline,
869866
context=context,
870-
trial_number=trial_number,
871867
repo_trial=repo_trial,
872868
validation_splits=validation_splits,
873869
n_trial=self.n_trial
@@ -1153,8 +1149,9 @@ class ValidationSplitter(BaseValidationSplitter):
11531149
def __init__(self, test_size: float):
11541150
self.test_size = test_size
11551151

1156-
def split(self, data_inputs, current_ids=None, expected_outputs=None, context: ExecutionContext = None) -> Tuple[
1157-
List, List, List, List]:
1152+
def split(
1153+
self, data_inputs, current_ids=None, expected_outputs=None, context: ExecutionContext = None
1154+
) -> Tuple[List, List, List, List]:
11581155
train_data_inputs, train_expected_outputs, train_current_ids, validation_data_inputs, validation_expected_outputs, validation_current_ids = validation_split(
11591156
test_size=self.test_size,
11601157
data_inputs=data_inputs,

0 commit comments

Comments
 (0)