|
1 |
| -import shutil |
2 |
| -import subprocess |
3 |
| -from datetime import datetime |
4 |
| -from pathlib import Path |
5 |
| -from typing import Any, Optional, Union |
6 |
| - |
7 | 1 | import fire
|
8 | 2 |
|
9 | 3 | from rdagent.app.data_science.conf import DS_RD_SETTING
|
10 |
| -from rdagent.components.coder.data_science.ensemble import EnsembleCoSTEER |
11 |
| -from rdagent.components.coder.data_science.ensemble.exp import EnsembleTask |
12 |
| -from rdagent.components.coder.data_science.feature import FeatureCoSTEER |
13 |
| -from rdagent.components.coder.data_science.feature.exp import FeatureTask |
14 |
| -from rdagent.components.coder.data_science.model import ModelCoSTEER |
15 |
| -from rdagent.components.coder.data_science.model.exp import ModelTask |
16 |
| -from rdagent.components.coder.data_science.pipeline import PipelineCoSTEER |
17 |
| -from rdagent.components.coder.data_science.pipeline.exp import PipelineTask |
18 |
| -from rdagent.components.coder.data_science.raw_data_loader import DataLoaderCoSTEER |
19 |
| -from rdagent.components.coder.data_science.raw_data_loader.exp import DataLoaderTask |
20 |
| -from rdagent.components.coder.data_science.share.doc import DocDev |
21 |
| -from rdagent.components.coder.data_science.workflow import WorkflowCoSTEER |
22 |
| -from rdagent.components.coder.data_science.workflow.exp import WorkflowTask |
23 |
| -from rdagent.components.workflow.conf import BasePropSetting |
24 |
| -from rdagent.components.workflow.rd_loop import RDLoop |
25 |
| -from rdagent.core.conf import RD_AGENT_SETTINGS |
26 |
| -from rdagent.core.exception import CoderError, RunnerError |
27 |
| -from rdagent.core.proposal import ExperimentFeedback |
28 |
| -from rdagent.core.scenario import Scenario |
29 | 4 | from rdagent.core.utils import import_class
|
30 | 5 | from rdagent.log import rdagent_logger as logger
|
31 |
| -from rdagent.scenarios.data_science.dev.feedback import DSExperiment2Feedback |
32 |
| -from rdagent.scenarios.data_science.dev.runner import DSCoSTEERRunner |
33 |
| -from rdagent.scenarios.data_science.experiment.experiment import DSExperiment |
34 |
| -from rdagent.scenarios.data_science.proposal.exp_gen import DSExpGen, DSTrace |
35 |
| -from rdagent.scenarios.data_science.proposal.exp_gen.ckp_select import ( |
36 |
| - BackJumpCKPSelector, |
37 |
| - LatestCKPSelector, |
38 |
| - SOTAJumpCKPSelector, |
39 |
| -) |
40 |
| -from rdagent.scenarios.data_science.proposal.exp_gen.idea_pool import DSKnowledgeBase |
41 |
| -from rdagent.scenarios.data_science.proposal.exp_gen.sota_exp_select import ( |
42 |
| - AutoSOTAexpSelector, |
43 |
| - BestValidSelector, |
44 |
| - GlobalSOTASelector, |
45 |
| -) |
46 |
| -from rdagent.scenarios.kaggle.kaggle_crawler import download_data |
47 |
| - |
48 |
| -CKP_SELECTOR_NAME_MAP = { |
49 |
| - "latest": LatestCKPSelector, |
50 |
| - "sota_jump": SOTAJumpCKPSelector, |
51 |
| - "back_jump": BackJumpCKPSelector, |
52 |
| -} |
53 |
| - |
54 |
| -SOTA_EXP_SELECTOR_NAME_MAP = { |
55 |
| - "global_sota": GlobalSOTASelector, |
56 |
| - "auto_sota": AutoSOTAexpSelector, |
57 |
| - "best_valid_sota": BestValidSelector, |
58 |
| -} |
59 |
| - |
60 |
| - |
61 |
| -class DataScienceRDLoop(RDLoop): |
62 |
| - skip_loop_error = (CoderError, RunnerError) |
63 |
| - |
64 |
| - def __init__(self, PROP_SETTING: BasePropSetting): |
65 |
| - logger.log_object(PROP_SETTING.competition, tag="competition") |
66 |
| - scen: Scenario = import_class(PROP_SETTING.scen)(PROP_SETTING.competition) |
67 |
| - |
68 |
| - # 1) task generation from scratch |
69 |
| - # self.scratch_gen: tuple[HypothesisGen, Hypothesis2Experiment] = DummyHypothesisGen(scen), |
70 |
| - |
71 |
| - # 2) task generation from a complete solution |
72 |
| - # self.exp_gen: ExpGen = import_class(PROP_SETTING.exp_gen)(scen) |
73 |
| - |
74 |
| - # self.ckp_selector = CKP_SELECTOR_NAME_MAP[DS_RD_SETTING.selector_name]() |
75 |
| - # self.sota_exp_selector = SOTA_EXP_SELECTOR_NAME_MAP[DS_RD_SETTING.sota_exp_selector_name]() |
76 |
| - self.ckp_selector = import_class(PROP_SETTING.selector_name)() |
77 |
| - self.sota_exp_selector = import_class(PROP_SETTING.sota_exp_selector_name)() |
78 |
| - |
79 |
| - self.exp_gen = import_class(PROP_SETTING.hypothesis_gen)(scen) |
80 |
| - |
81 |
| - # coders |
82 |
| - self.data_loader_coder = DataLoaderCoSTEER(scen) |
83 |
| - self.feature_coder = FeatureCoSTEER(scen) |
84 |
| - self.model_coder = ModelCoSTEER(scen) |
85 |
| - self.ensemble_coder = EnsembleCoSTEER(scen) |
86 |
| - self.workflow_coder = WorkflowCoSTEER(scen) |
87 |
| - |
88 |
| - self.pipeline_coder = PipelineCoSTEER(scen) |
89 |
| - |
90 |
| - self.runner = DSCoSTEERRunner(scen) |
91 |
| - if DS_RD_SETTING.enable_doc_dev: |
92 |
| - self.docdev = DocDev(scen) |
93 |
| - # self.summarizer: Experiment2Feedback = import_class(PROP_SETTING.summarizer)(scen) |
94 |
| - # logger.log_object(self.summarizer, tag="summarizer") |
95 |
| - |
96 |
| - if DS_RD_SETTING.enable_knowledge_base and DS_RD_SETTING.knowledge_base_version == "v1": |
97 |
| - knowledge_base = DSKnowledgeBase( |
98 |
| - path=DS_RD_SETTING.knowledge_base_path, idea_pool_json_path=DS_RD_SETTING.idea_pool_json_path |
99 |
| - ) |
100 |
| - self.trace = DSTrace(scen=scen, knowledge_base=knowledge_base) |
101 |
| - else: |
102 |
| - self.trace = DSTrace(scen=scen) |
103 |
| - self.summarizer = DSExperiment2Feedback(scen) |
104 |
| - super(RDLoop, self).__init__() |
105 |
| - |
106 |
| - def direct_exp_gen(self, prev_out: dict[str, Any]): |
107 |
| - |
108 |
| - # set the SOTA experiment to submit |
109 |
| - sota_exp_to_submit = self.sota_exp_selector.get_sota_exp_to_submit(self.trace) |
110 |
| - self.trace.set_sota_exp_to_submit(sota_exp_to_submit) |
111 |
| - |
112 |
| - # set the checkpoint to start from |
113 |
| - selection = self.ckp_selector.get_selection(self.trace) |
114 |
| - exp = self.exp_gen.gen(self.trace, selection) |
115 |
| - logger.log_object(exp) |
116 |
| - |
117 |
| - # FIXME: this is for LLM debug webapp, remove this when the debugging is done. |
118 |
| - logger.log_object(exp, tag="debug_exp_gen") |
119 |
| - return exp |
120 |
| - |
121 |
| - def coding(self, prev_out: dict[str, Any]): |
122 |
| - exp = prev_out["direct_exp_gen"] |
123 |
| - for tasks in exp.pending_tasks_list: |
124 |
| - exp.sub_tasks = tasks |
125 |
| - with logger.tag(f"{exp.sub_tasks[0].__class__.__name__}"): |
126 |
| - if isinstance(exp.sub_tasks[0], DataLoaderTask): |
127 |
| - exp = self.data_loader_coder.develop(exp) |
128 |
| - elif isinstance(exp.sub_tasks[0], FeatureTask): |
129 |
| - exp = self.feature_coder.develop(exp) |
130 |
| - elif isinstance(exp.sub_tasks[0], ModelTask): |
131 |
| - exp = self.model_coder.develop(exp) |
132 |
| - elif isinstance(exp.sub_tasks[0], EnsembleTask): |
133 |
| - exp = self.ensemble_coder.develop(exp) |
134 |
| - elif isinstance(exp.sub_tasks[0], WorkflowTask): |
135 |
| - exp = self.workflow_coder.develop(exp) |
136 |
| - elif isinstance(exp.sub_tasks[0], PipelineTask): |
137 |
| - exp = self.pipeline_coder.develop(exp) |
138 |
| - else: |
139 |
| - raise NotImplementedError(f"Unsupported component in DataScienceRDLoop: {exp.hypothesis.component}") |
140 |
| - exp.sub_tasks = [] |
141 |
| - logger.log_object(exp) |
142 |
| - return exp |
143 |
| - |
144 |
| - def running(self, prev_out: dict[str, Any]): |
145 |
| - exp: DSExperiment = prev_out["coding"] |
146 |
| - if exp.is_ready_to_run(): |
147 |
| - new_exp = self.runner.develop(exp) |
148 |
| - logger.log_object(new_exp) |
149 |
| - exp = new_exp |
150 |
| - if DS_RD_SETTING.enable_doc_dev: |
151 |
| - self.docdev.develop(exp) |
152 |
| - return exp |
153 |
| - |
154 |
| - def feedback(self, prev_out: dict[str, Any]) -> ExperimentFeedback: |
155 |
| - """ |
156 |
| - Assumption: |
157 |
| - - If we come to feedback phase, the previous development steps are successful. |
158 |
| - """ |
159 |
| - exp: DSExperiment = prev_out["running"] |
160 |
| - if self.trace.next_incomplete_component() is None or DS_RD_SETTING.coder_on_whole_pipeline: |
161 |
| - # we have alreadly completed components in previous trace. So current loop is focusing on a new proposed idea. |
162 |
| - # So we need feedback for the proposal. |
163 |
| - feedback = self.summarizer.generate_feedback(exp, self.trace) |
164 |
| - else: |
165 |
| - # Otherwise, it is on drafting stage, don't need complicated feedbacks. |
166 |
| - feedback = ExperimentFeedback( |
167 |
| - reason=f"{exp.hypothesis.component} is completed.", |
168 |
| - decision=True, |
169 |
| - ) |
170 |
| - logger.log_object(feedback) |
171 |
| - return feedback |
172 |
| - |
173 |
| - def record(self, prev_out: dict[str, Any]): |
174 |
| - # set the DAG parent for the trace |
175 |
| - self.trace.sync_dag_parent_and_hist() |
176 |
| - |
177 |
| - e = prev_out.get(self.EXCEPTION_KEY, None) |
178 |
| - if e is None: |
179 |
| - self.trace.hist.append((prev_out["running"], prev_out["feedback"])) |
180 |
| - else: |
181 |
| - self.trace.hist.append( |
182 |
| - ( |
183 |
| - prev_out["direct_exp_gen"] if isinstance(e, CoderError) else prev_out["coding"], |
184 |
| - ExperimentFeedback.from_exception(e), |
185 |
| - ) |
186 |
| - ) |
187 |
| - if self.trace.sota_experiment() is None: |
188 |
| - if DS_RD_SETTING.coder_on_whole_pipeline: |
189 |
| - # check if feedback is not generated |
190 |
| - if len(self.trace.hist) >= DS_RD_SETTING.coding_fail_reanalyze_threshold: |
191 |
| - recent_hist = self.trace.hist[-DS_RD_SETTING.coding_fail_reanalyze_threshold :] |
192 |
| - if all(isinstance(fb.exception, (CoderError, RunnerError)) for _, fb in recent_hist): |
193 |
| - new_scen = self.trace.scen |
194 |
| - if hasattr(new_scen, "reanalyze_competition_description"): |
195 |
| - logger.info( |
196 |
| - "Reanalyzing the competition description after three consecutive coding failures." |
197 |
| - ) |
198 |
| - new_scen.reanalyze_competition_description() |
199 |
| - self.trace.scen = new_scen |
200 |
| - else: |
201 |
| - logger.info("Can not reanalyze the competition description.") |
202 |
| - elif len(self.trace.hist) >= DS_RD_SETTING.consecutive_errors: |
203 |
| - # if {in inital/drafting stage} and {tried enough times} |
204 |
| - for _, fb in self.trace.hist[-DS_RD_SETTING.consecutive_errors :]: |
205 |
| - if fb: |
206 |
| - break # any success will stop restarting. |
207 |
| - else: # otherwise restart it |
208 |
| - logger.error("Consecutive errors reached the limit. Dumping trace.") |
209 |
| - logger.log_object(self.trace, tag="trace before restart") |
210 |
| - self.trace = DSTrace(scen=self.trace.scen, knowledge_base=self.trace.knowledge_base) |
211 |
| - |
212 |
| - logger.log_object(self.trace, tag="trace") |
213 |
| - logger.log_object(self.trace.sota_experiment(), tag="SOTA experiment") |
214 |
| - |
215 |
| - if DS_RD_SETTING.enable_knowledge_base and DS_RD_SETTING.knowledge_base_version == "v1": |
216 |
| - logger.log_object(self.trace.knowledge_base, tag="knowledge_base") |
217 |
| - self.trace.knowledge_base.dump() |
218 |
| - |
219 |
| - if ( |
220 |
| - DS_RD_SETTING.enable_log_archive |
221 |
| - and DS_RD_SETTING.log_archive_path is not None |
222 |
| - and Path(DS_RD_SETTING.log_archive_path).is_dir() |
223 |
| - ): |
224 |
| - start_archive_datetime = datetime.now() |
225 |
| - logger.info(f"Archiving log and workspace folder after loop {self.loop_idx}") |
226 |
| - mid_log_tar_path = ( |
227 |
| - Path( |
228 |
| - DS_RD_SETTING.log_archive_temp_path |
229 |
| - if DS_RD_SETTING.log_archive_temp_path |
230 |
| - else DS_RD_SETTING.log_archive_path |
231 |
| - ) |
232 |
| - / "mid_log.tar" |
233 |
| - ) |
234 |
| - mid_workspace_tar_path = ( |
235 |
| - Path( |
236 |
| - DS_RD_SETTING.log_archive_temp_path |
237 |
| - if DS_RD_SETTING.log_archive_temp_path |
238 |
| - else DS_RD_SETTING.log_archive_path |
239 |
| - ) |
240 |
| - / "mid_workspace.tar" |
241 |
| - ) |
242 |
| - subprocess.run(["tar", "-cf", str(mid_log_tar_path), "-C", (Path().cwd() / "log"), "."], check=True) |
243 |
| - |
244 |
| - # remove all files and folders in the workspace except for .py, .md, and .csv files to avoid large workspace dump |
245 |
| - for workspace_id in Path(RD_AGENT_SETTINGS.workspace_path).iterdir(): |
246 |
| - for file_and_folder in workspace_id.iterdir(): |
247 |
| - if file_and_folder.is_dir(): |
248 |
| - shutil.rmtree(file_and_folder) |
249 |
| - elif file_and_folder.is_file() and file_and_folder.suffix not in [".py", ".md", ".csv"]: |
250 |
| - file_and_folder.unlink() |
251 |
| - |
252 |
| - subprocess.run( |
253 |
| - ["tar", "-cf", str(mid_workspace_tar_path), "-C", (RD_AGENT_SETTINGS.workspace_path), "."], check=True |
254 |
| - ) |
255 |
| - if DS_RD_SETTING.log_archive_temp_path is not None: |
256 |
| - shutil.move(mid_log_tar_path, Path(DS_RD_SETTING.log_archive_path) / "mid_log.tar") |
257 |
| - mid_log_tar_path = Path(DS_RD_SETTING.log_archive_path) / "mid_log.tar" |
258 |
| - shutil.move(mid_workspace_tar_path, Path(DS_RD_SETTING.log_archive_path) / "mid_workspace.tar") |
259 |
| - mid_workspace_tar_path = Path(DS_RD_SETTING.log_archive_path) / "mid_workspace.tar" |
260 |
| - shutil.copy( |
261 |
| - mid_log_tar_path, Path(DS_RD_SETTING.log_archive_path) / "mid_log_bak.tar" |
262 |
| - ) # backup when upper code line is killed when running |
263 |
| - shutil.copy( |
264 |
| - mid_workspace_tar_path, Path(DS_RD_SETTING.log_archive_path) / "mid_workspace_bak.tar" |
265 |
| - ) # backup when upper code line is killed when running |
266 |
| - self.timer.add_duration(datetime.now() - start_archive_datetime) |
267 |
| - |
268 |
| - @classmethod |
269 |
| - def load( |
270 |
| - cls, |
271 |
| - path: Union[str, Path], |
272 |
| - output_path: Optional[Union[str, Path]] = None, |
273 |
| - do_truncate: bool = False, |
274 |
| - replace_timer: bool = True, |
275 |
| - ) -> "LoopBase": |
276 |
| - session = super().load(path, output_path, do_truncate, replace_timer) |
277 |
| - logger.log_object(DS_RD_SETTING.competition, tag="competition") # NOTE: necessary to make mle_summary work. |
278 |
| - if DS_RD_SETTING.enable_knowledge_base and DS_RD_SETTING.knowledge_base_version == "v1": |
279 |
| - session.trace.knowledge_base = DSKnowledgeBase( |
280 |
| - path=DS_RD_SETTING.knowledge_base_path, idea_pool_json_path=DS_RD_SETTING.idea_pool_json_path |
281 |
| - ) |
282 |
| - return session |
283 |
| - |
284 |
| - def dump(self, path: str | Path) -> None: |
285 |
| - """ |
286 |
| - Since knowledge_base is big and we don't want to dump it every time |
287 |
| - So we remove it from the trace before dumping and restore it after. |
288 |
| - """ |
289 |
| - backup_knowledge_base = None |
290 |
| - if self.trace.knowledge_base is not None: |
291 |
| - backup_knowledge_base = self.trace.knowledge_base |
292 |
| - self.trace.knowledge_base = None |
293 |
| - super().dump(path) |
294 |
| - if backup_knowledge_base is not None: |
295 |
| - self.trace.knowledge_base = backup_knowledge_base |
| 6 | +from rdagent.scenarios.data_science.loop import DataScienceRDLoop |
296 | 7 |
|
297 | 8 |
|
298 | 9 | def main(
|
|
0 commit comments