Skip to content

Commit 2b7371e

Browse files
you-n-gXu
authored andcommitted
feat: parallel loop running based on asyncio (#932)
* refactor: split workflow into pkg, add WorkflowTracker & wait_retry * feat: add async LoopBase with parallel workers and step semaphores * fix: replace pickle with dill and run blocking tasks via joblib wrapper * feat: add log format settings, dynamic parallelism & pickle-based snapshot * fix: default step semaphore to 1 and avoid subprocess when single worker * merge bowen's changes * merge tim's changes * refactor: extract component task mapping, add conditional logger setup * lint * refactor: add type hints and safer remain_time metric logging in workflow * lint * fix: allow BadRequestError to be pickled via custom copyreg reducer * fix: stop loop when LoopTerminationError is raised in LoopBase * lint * refactor: make log tag context-local using ContextVar for thread safety * feat: add subproc_step flag and helper to decide subprocess execution * fix: use ./cache path and normalize relative volume bind paths * fix: reset loop_idx to 0 on loop restart/resume to ensure correct flow * fix: avoid chmod on cache and input dirs in Env timeout wrapper * fix: skip chmod on 'cache' and 'input' dirs using find -prune * fix: restrict chmod to immediate mount dirs excluding cache/input * fix: chmod cache and input dirs alongside their contents after entry run * fix: guard chmod with directory checks for cache and input * fix: prefix mount_path in chmod command for cache/input dirs * fix: drop quotes from find exclude patterns to ensure chmod executes * fix: skip chmod on cache/input directories to avoid warning spam * feat: support string volume mappings and poll subprocess stdout/stderr * support remove symbolic link * test: use dynamic home path and code volume in LocalEnv local_simple * fix: skip trace and progress update when loop step is withdrawn * refactor: add clean_workspace util and non-destructive workspace backup * fix: preserve symlinks when backing up workspace with copytree * fix: prevent AttributeError when _pbar not yet initialized in LoopBase * perf: replace shutil.copytree with rsync for faster workspace backup * fix: cast log directory Path to str in tar command of data science loop * fix: use portable 'cp -r -P' instead of rsync for workspace backup * fix: add retry and logging to workspace backup for robustness * refactor: extract backup_folder helper and reuse in DataScienceRDLoop * fix: propagate backup errors & default _pbar getattr to avoid error * fix the division by zero bug * refactor: execute RD loops via asyncio.run and add necessary imports * lint * lint * lint --------- Co-authored-by: Xu <[email protected]>
1 parent 8fbde58 commit 2b7371e

File tree

26 files changed

+937
-517
lines changed

26 files changed

+937
-517
lines changed

rdagent/app/data_mining/model.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
import fire
24

35
from rdagent.app.data_mining.conf import MED_PROP_SETTING
@@ -24,7 +26,7 @@ def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
2426
model_loop = ModelRDLoop(MED_PROP_SETTING)
2527
else:
2628
model_loop = ModelRDLoop.load(path, checkout=checkout)
27-
model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
29+
asyncio.run(model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))
2830

2931

3032
if __name__ == "__main__":

rdagent/app/data_science/loop.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from pathlib import Path
23

34
import fire
@@ -66,7 +67,7 @@ def main(
6667
if exp_gen_cls is not None:
6768
kaggle_loop.exp_gen = import_class(exp_gen_cls)(kaggle_loop.exp_gen.scen)
6869

69-
kaggle_loop.run(step_n=step_n, loop_n=loop_n, all_duration=timeout)
70+
asyncio.run(kaggle_loop.run(step_n=step_n, loop_n=loop_n, all_duration=timeout))
7071

7172

7273
if __name__ == "__main__":

rdagent/app/qlib_rd_loop/factor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Factor workflow with session control
33
"""
44

5+
import asyncio
56
from typing import Any
67

78
import fire
@@ -40,7 +41,7 @@ def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
4041
model_loop = FactorRDLoop(FACTOR_PROP_SETTING)
4142
else:
4243
model_loop = FactorRDLoop.load(path, checkout=checkout)
43-
model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
44+
asyncio.run(model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))
4445

4546

4647
if __name__ == "__main__":

rdagent/app/qlib_rd_loop/factor_from_report.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import json
23
from pathlib import Path
34
from typing import Any, Dict, Tuple
@@ -162,7 +163,7 @@ def main(report_folder=None, path=None, step_n=None, loop_n=None, all_duration=N
162163
else:
163164
model_loop = FactorReportLoop(report_folder=report_folder)
164165

165-
model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
166+
asyncio.run(model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))
166167

167168

168169
if __name__ == "__main__":

rdagent/app/qlib_rd_loop/model.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
Model workflow with session control
33
"""
44

5+
import asyncio
6+
57
import fire
68

79
from rdagent.app.qlib_rd_loop.conf import MODEL_PROP_SETTING
@@ -28,7 +30,7 @@ def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
2830
model_loop = ModelRDLoop(MODEL_PROP_SETTING)
2931
else:
3032
model_loop = ModelRDLoop.load(path, checkout=checkout)
31-
model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
33+
asyncio.run(model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))
3234

3335

3436
if __name__ == "__main__":

rdagent/app/qlib_rd_loop/quant.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Quant (Factor & Model) workflow with session control
33
"""
44

5+
import asyncio
56
from typing import Any
67

78
import fire
@@ -130,7 +131,8 @@ def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
130131
quant_loop = QuantRDLoop(QUANT_PROP_SETTING)
131132
else:
132133
quant_loop = QuantRDLoop.load(path, checkout=checkout)
133-
quant_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
134+
135+
asyncio.run(quant_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))
134136

135137

136138
if __name__ == "__main__":

rdagent/components/coder/data_science/feature/prompts.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ feature_coder:
4646
5. You should use the following cache decorator to cache the results of the function:
4747
```python
4848
from joblib import Memory
49-
memory = Memory(location='/tmp/cache', verbose=0)
49+
memory = Memory(location='./cache', verbose=0)
5050
@memory.cache```
5151
6. Coding tricks:
52-
- If the input consists of a batch of file paths and you need to modify the file contents to complete your feature engineering task, you can accomplish your feature engineering task by modifying these files and creating new files in a subfolder within "/tmp/cache" (this path is persistent, otherwise you may lose your created file). Then the new file paths are returned.
52+
- If the input consists of a batch of file paths and you need to modify the file contents to complete your feature engineering task, you can accomplish your feature engineering task by modifying these files and creating new files in a subfolder within "./cache" (this path is persistent, otherwise you may lose your created file). Then the new file paths are returned.
5353
5454
{% include "scenarios.data_science.share:guidelines.coding" %}
5555

rdagent/components/coder/data_science/model/prompts.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ model_coder:
4343
4. You should use the following cache decorator to cache the results of the function:
4444
```python
4545
from joblib import Memory
46-
memory = Memory(location='/tmp/cache', verbose=0)
46+
memory = Memory(location='./cache', verbose=0)
4747
@memory.cache``
4848
{% include "scenarios.data_science.share:guidelines.coding" %}
4949

rdagent/components/coder/data_science/raw_data_loader/prompts.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ data_loader_coder:
273273
3. You should use the following cache decorator to cache the results of the function:
274274
```python
275275
from joblib import Memory
276-
memory = Memory(location='/tmp/cache', verbose=0)
276+
memory = Memory(location='./cache', verbose=0)
277277
@memory.cache```
278278
{% include "scenarios.data_science.share:guidelines.coding" %}
279279

rdagent/core/conf.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,5 +78,24 @@ class RDAgentSettings(ExtendedBaseSettings):
7878

7979
initial_fator_library_size: int = 20
8080

81+
# parallel loop
82+
step_semaphore: int | dict[str, int] = 1
83+
"""the semaphore for each step; you can specify a overall semaphore
84+
or a step-wise semaphore like {"coding": 3, "running": 2}"""
85+
86+
def get_max_parallel(self) -> int:
87+
"""Based on the setting of semaphore, return the maximum number of parallel loops"""
88+
if isinstance(self.step_semaphore, int):
89+
return self.step_semaphore
90+
else:
91+
return max(self.step_semaphore.values())
92+
93+
# NOTE: for debug
94+
# the following function only serves as debugging and is necessary in main logic.
95+
subproc_step: bool = False
96+
97+
def is_force_subproc(self) -> bool:
98+
return self.subproc_step or self.get_max_parallel() > 1
99+
81100

82101
RD_AGENT_SETTINGS = RDAgentSettings()

0 commit comments

Comments
 (0)