Skip to content

feat: parallel loop running based on asyncio #932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
94f9b28
refactor: split workflow into pkg, add WorkflowTracker & wait_retry
you-n-g Jun 4, 2025
045915c
feat: add async LoopBase with parallel workers and step semaphores
you-n-g Jun 4, 2025
987e084
fix: replace pickle with dill and run blocking tasks via joblib wrapper
you-n-g Jun 5, 2025
55bd378
feat: add log format settings, dynamic parallelism & pickle-based sna…
you-n-g Jun 6, 2025
c4246b4
fix: default step semaphore to 1 and avoid subprocess when single worker
you-n-g Jun 6, 2025
0214af6
Merge remote-tracking branch 'origin/HEAD' into multi-proc
you-n-g Jun 6, 2025
8ddf84a
merge bowen's changes
you-n-g Jun 6, 2025
f8135e1
merge tim's changes
you-n-g Jun 6, 2025
368d4bc
refactor: extract component task mapping, add conditional logger setup
you-n-g Jun 6, 2025
5418369
lint
you-n-g Jun 6, 2025
cd75559
refactor: add type hints and safer remain_time metric logging in work…
you-n-g Jun 6, 2025
e751a09
lint
you-n-g Jun 6, 2025
0b5c749
fix: allow BadRequestError to be pickled via custom copyreg reducer
you-n-g Jun 7, 2025
1bfae17
fix: stop loop when LoopTerminationError is raised in LoopBase
you-n-g Jun 7, 2025
1e33f31
lint
you-n-g Jun 7, 2025
9aa7ea0
refactor: make log tag context-local using ContextVar for thread safety
you-n-g Jun 7, 2025
94269cf
feat: add subproc_step flag and helper to decide subprocess execution
you-n-g Jun 9, 2025
1c24439
fix: use ./cache path and normalize relative volume bind paths
you-n-g Jun 9, 2025
24a9220
fix: reset loop_idx to 0 on loop restart/resume to ensure correct flow
you-n-g Jun 9, 2025
3251f73
Merge remote-tracking branch 'origin/HEAD' into multi-proc
you-n-g Jun 9, 2025
9dd7fe9
Merge remote-tracking branch 'origin/HEAD' into multi-proc
you-n-g Jun 9, 2025
17301be
fix: avoid chmod on cache and input dirs in Env timeout wrapper
you-n-g Jun 9, 2025
e9944ad
fix: skip chmod on 'cache' and 'input' dirs using find -prune
you-n-g Jun 9, 2025
49683f0
fix: restrict chmod to immediate mount dirs excluding cache/input
you-n-g Jun 9, 2025
2e6f613
fix: chmod cache and input dirs alongside their contents after entry run
you-n-g Jun 9, 2025
688c593
fix: guard chmod with directory checks for cache and input
you-n-g Jun 9, 2025
4a18de2
fix: prefix mount_path in chmod command for cache/input dirs
you-n-g Jun 9, 2025
507f719
fix: drop quotes from find exclude patterns to ensure chmod executes
you-n-g Jun 9, 2025
49e11a7
fix: skip chmod on cache/input directories to avoid warning spam
you-n-g Jun 9, 2025
bd97be6
feat: support string volume mappings and poll subprocess stdout/stderr
you-n-g Jun 9, 2025
9422183
support remove symbolic link
you-n-g Jun 9, 2025
08b9441
test: use dynamic home path and code volume in LocalEnv local_simple
you-n-g Jun 9, 2025
f36e29e
fix: skip trace and progress update when loop step is withdrawn
you-n-g Jun 10, 2025
f66093c
refactor: add clean_workspace util and non-destructive workspace backup
you-n-g Jun 10, 2025
0c86eb5
fix: preserve symlinks when backing up workspace with copytree
you-n-g Jun 10, 2025
6ae65bb
fix: prevent AttributeError when _pbar not yet initialized in LoopBase
you-n-g Jun 10, 2025
0819027
perf: replace shutil.copytree with rsync for faster workspace backup
you-n-g Jun 10, 2025
b87a7dc
fix: cast log directory Path to str in tar command of data science loop
you-n-g Jun 10, 2025
86b737f
fix: use portable 'cp -r -P' instead of rsync for workspace backup
you-n-g Jun 10, 2025
953254a
fix: add retry and logging to workspace backup for robustness
you-n-g Jun 11, 2025
9674dce
refactor: extract backup_folder helper and reuse in DataScienceRDLoop
you-n-g Jun 11, 2025
130b118
fix: propagate backup errors & default _pbar getattr to avoid error
you-n-g Jun 11, 2025
e012f69
fix the division by zero bug
Jun 11, 2025
eab79f3
refactor: execute RD loops via asyncio.run and add necessary imports
you-n-g Jun 11, 2025
cf1f163
lint
you-n-g Jun 11, 2025
4a158ec
lint
you-n-g Jun 11, 2025
1ed7611
lint
you-n-g Jun 11, 2025
5e19f25
Merge remote-tracking branch 'origin/HEAD' into multi-proc
you-n-g Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rdagent/app/data_mining/model.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

import fire

from rdagent.app.data_mining.conf import MED_PROP_SETTING
Expand All @@ -24,7 +26,7 @@ def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
model_loop = ModelRDLoop(MED_PROP_SETTING)
else:
model_loop = ModelRDLoop.load(path, checkout=checkout)
model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
asyncio.run(model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion rdagent/app/data_science/loop.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from pathlib import Path

import fire
Expand Down Expand Up @@ -66,7 +67,7 @@ def main(
if exp_gen_cls is not None:
kaggle_loop.exp_gen = import_class(exp_gen_cls)(kaggle_loop.exp_gen.scen)

kaggle_loop.run(step_n=step_n, loop_n=loop_n, all_duration=timeout)
asyncio.run(kaggle_loop.run(step_n=step_n, loop_n=loop_n, all_duration=timeout))


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion rdagent/app/qlib_rd_loop/factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Factor workflow with session control
"""

import asyncio
from typing import Any

import fire
Expand Down Expand Up @@ -40,7 +41,7 @@ def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
model_loop = FactorRDLoop(FACTOR_PROP_SETTING)
else:
model_loop = FactorRDLoop.load(path, checkout=checkout)
model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
asyncio.run(model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion rdagent/app/qlib_rd_loop/factor_from_report.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, Tuple
Expand Down Expand Up @@ -162,7 +163,7 @@ def main(report_folder=None, path=None, step_n=None, loop_n=None, all_duration=N
else:
model_loop = FactorReportLoop(report_folder=report_folder)

model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
asyncio.run(model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion rdagent/app/qlib_rd_loop/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Model workflow with session control
"""

import asyncio

import fire

from rdagent.app.qlib_rd_loop.conf import MODEL_PROP_SETTING
Expand All @@ -28,7 +30,7 @@ def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
model_loop = ModelRDLoop(MODEL_PROP_SETTING)
else:
model_loop = ModelRDLoop.load(path, checkout=checkout)
model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)
asyncio.run(model_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion rdagent/app/qlib_rd_loop/quant.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Quant (Factor & Model) workflow with session control
"""

import asyncio
from typing import Any

import fire
Expand Down Expand Up @@ -130,7 +131,8 @@ def main(path=None, step_n=None, loop_n=None, all_duration=None, checkout=True):
quant_loop = QuantRDLoop(QUANT_PROP_SETTING)
else:
quant_loop = QuantRDLoop.load(path, checkout=checkout)
quant_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration)

asyncio.run(quant_loop.run(step_n=step_n, loop_n=loop_n, all_duration=all_duration))


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions rdagent/components/coder/data_science/feature/prompts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ feature_coder:
5. You should use the following cache decorator to cache the results of the function:
```python
from joblib import Memory
memory = Memory(location='/tmp/cache', verbose=0)
memory = Memory(location='./cache', verbose=0)
@memory.cache```
6. Coding tricks:
- If the input consists of a batch of file paths and you need to modify the file contents to complete your feature engineering task, you can accomplish your feature engineering task by modifying these files and creating new files in a subfolder within "/tmp/cache" (this path is persistent, otherwise you may lose your created file). Then the new file paths are returned.
- If the input consists of a batch of file paths and you need to modify the file contents to complete your feature engineering task, you can accomplish your feature engineering task by modifying these files and creating new files in a subfolder within "./cache" (this path is persistent, otherwise you may lose your created file). Then the new file paths are returned.

{% include "scenarios.data_science.share:guidelines.coding" %}

Expand Down
2 changes: 1 addition & 1 deletion rdagent/components/coder/data_science/model/prompts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ model_coder:
4. You should use the following cache decorator to cache the results of the function:
```python
from joblib import Memory
memory = Memory(location='/tmp/cache', verbose=0)
memory = Memory(location='./cache', verbose=0)
@memory.cache``
{% include "scenarios.data_science.share:guidelines.coding" %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ data_loader_coder:
3. You should use the following cache decorator to cache the results of the function:
```python
from joblib import Memory
memory = Memory(location='/tmp/cache', verbose=0)
memory = Memory(location='./cache', verbose=0)
@memory.cache```
{% include "scenarios.data_science.share:guidelines.coding" %}

Expand Down
19 changes: 19 additions & 0 deletions rdagent/core/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,24 @@ class RDAgentSettings(ExtendedBaseSettings):

initial_fator_library_size: int = 20

# parallel loop
step_semaphore: int | dict[str, int] = 1
"""the semaphore for each step; you can specify a overall semaphore
or a step-wise semaphore like {"coding": 3, "running": 2}"""

def get_max_parallel(self) -> int:
"""Based on the setting of semaphore, return the maximum number of parallel loops"""
if isinstance(self.step_semaphore, int):
return self.step_semaphore
else:
return max(self.step_semaphore.values())

# NOTE: for debug
# the following function only serves as debugging and is necessary in main logic.
subproc_step: bool = False

def is_force_subproc(self) -> bool:
return self.subproc_step or self.get_max_parallel() > 1


RD_AGENT_SETTINGS = RDAgentSettings()
18 changes: 17 additions & 1 deletion rdagent/core/proposal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from typing import Generic, List, Tuple, TypeVar
from typing import TYPE_CHECKING, Generic, List, Tuple, TypeVar

from rdagent.core.conf import RD_AGENT_SETTINGS
from rdagent.core.evaluation import Feedback
from rdagent.core.experiment import ASpecificExp, Experiment
from rdagent.core.knowledge_base import KnowledgeBase
from rdagent.core.scenario import Scenario

if TYPE_CHECKING:
from rdagent.utils.workflow.loop import LoopBase


class Hypothesis:
"""
Expand Down Expand Up @@ -248,6 +253,17 @@ def gen(self, trace: Trace) -> Experiment:
)
"""

async def async_gen(self, trace: Trace, loop: LoopBase) -> Experiment:
"""
generate the experiment and decide whether to stop yield generation and give up control to other routines.
"""
# we give a default implementation here.
# The proposal is set to try best to generate the experiment in max-parallel level.
while True:
if loop.get_unfinished_loop_cnt(loop.loop_idx) < RD_AGENT_SETTINGS.get_max_parallel():
return self.gen(trace)
await asyncio.sleep(1)


class HypothesisGen(ABC):

Expand Down
3 changes: 3 additions & 0 deletions rdagent/log/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ class LogSettings(ExtendedBaseSettings):

trace_path: str = str(Path.cwd() / "log" / datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S-%f"))

format_console: str | None = None
""""If it is None, leave it as the default"""

ui_server_port: int | None = None

storages: dict[str, list[int | str]] = {}
Expand Down
41 changes: 25 additions & 16 deletions rdagent/log/logger.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import os
import sys
from contextlib import contextmanager
from contextvars import ContextVar
from datetime import datetime
from pathlib import Path
from typing import Generator

from loguru import logger

from .conf import LOG_SETTINGS

if LOG_SETTINGS.format_console is not None:
logger.remove()
logger.add(sys.stdout, format=LOG_SETTINGS.format_console)

from psutil import Process

from rdagent.core.utils import SingletonBaseClass, import_class

from .base import Storage
from .conf import LOG_SETTINGS
from .storage import FileStorage
from .utils import get_caller_info

Expand Down Expand Up @@ -39,14 +46,16 @@ class RDAgentLog(SingletonBaseClass):

"""

# TODO: Simplify it to introduce less concepts ( We may merge RDAgentLog, Storage &)
# Solution: Storage => PipeLog, View => PipeLogView, RDAgentLog is an instance of PipeLogger
# PipeLogger.info(...) , PipeLogger.get_resp() to get feedback from frontend.
# def f():
# logger = PipeLog()
# logger.info("<code>")
# feedback = logger.get_reps()
_tag: str = ""
# Thread-/coroutine-local tag; In Linux forked subprocess, it will be copied to the subprocess.
_tag_ctx: ContextVar[str] = ContextVar("_tag_ctx", default="")

@property
def _tag(self) -> str: # Get current tag
return self._tag_ctx.get()

@_tag.setter # Set current tag
def _tag(self, value: str) -> None:
self._tag_ctx.set(value)

def __init__(self) -> None:
self.storage = FileStorage(LOG_SETTINGS.trace_path)
Expand All @@ -61,15 +70,16 @@ def __init__(self) -> None:
def tag(self, tag: str) -> Generator[None, None, None]:
if tag.strip() == "":
raise ValueError("Tag cannot be empty.")
if self._tag != "":
tag = "." + tag

# TODO: It may result in error in mutithreading or co-routine
self._tag = self._tag + tag
# Generate a new complete tag
current_tag = self._tag_ctx.get()
new_tag = tag if current_tag == "" else f"{current_tag}.{tag}"
# Set and save token for later restore
token = self._tag_ctx.set(new_tag)
try:
yield
finally:
self._tag = self._tag[: -len(tag)]
# Restore previous tag (thread/coroutine safe)
self._tag_ctx.reset(token)

def set_storages_path(self, path: str | Path) -> None:
for storage in [self.storage] + self.other_storages:
Expand All @@ -96,7 +106,6 @@ def get_pids(self) -> str:
return pid_chain

def log_object(self, obj: object, *, tag: str = "") -> None:
# TODO: I think we can merge the log_object function with other normal log methods to make the interface simpler.
caller_info = get_caller_info()
tag = f"{self._tag}.{tag}.{self.get_pids()}".strip(".")

Expand Down
13 changes: 13 additions & 0 deletions rdagent/oai/backend/litellm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import copyreg
from typing import Any, Literal, cast

import numpy as np
from litellm import (
BadRequestError,
completion,
completion_cost,
embedding,
Expand All @@ -15,6 +17,17 @@
from rdagent.oai.llm_conf import LLMSettings


# NOTE: Patching! Otherwise, the exception will call the constructor and with following error:
# `BadRequestError.__init__() missing 2 required positional arguments: 'model' and 'llm_provider'`
def _reduce_no_init(exc: Exception) -> tuple:
cls = exc.__class__
return (cls.__new__, (cls,), exc.__dict__)


# suppose you want to apply this to MyError
copyreg.pickle(BadRequestError, _reduce_no_init)


class LiteLLMSettings(LLMSettings):

class Config:
Expand Down
Loading