-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat: data gen pipeline interface modifications #2173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 10 commits
b0ee426
b9e7783
73b7cec
abcfac2
d0360c3
0d46ac0
c1787d4
4eb7493
e372f2e
61e5df0
29c1569
cb0a9e3
8d31398
635df5f
1e2eadc
c4c4bec
df3fe25
b702ee2
e69613c
9e3b331
76e7495
0db8c6a
76520fc
48005ff
97f8351
0b03da4
19e8469
8af0f0e
8f3b0be
d4ac1a0
07b6462
0cd0474
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,276 @@ | ||
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | ||
|
|
||
| import json | ||
| import os | ||
| from typing import Any, Dict, List, Optional, Union | ||
|
|
||
| from camel.logger import get_logger | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
|
|
||
| class BaseDataGenPipeline: | ||
| r"""Base class for all data generation pipelines. | ||
|
|
||
| Provides a unified interface for data generation pipelines, | ||
| allowing for flexible input and output options. It includes methods for loading | ||
| data from different sources and saving results to files. | ||
|
|
||
| Subclasses should implement the `generate` method to define their specific | ||
| data generation workflow. | ||
|
|
||
| Attributes: | ||
| output_path (Optional[str]): Path to save generated data. | ||
| batch_size (Optional[int]): Batch size for processing data. | ||
| max_workers (Optional[int]): Maximum number of worker threads. | ||
| save_intermediate (bool): Whether to save intermediate results. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| output_path: Optional[str] = None, | ||
| batch_size: Optional[int] = None, | ||
| max_workers: Optional[int] = None, | ||
| save_intermediate: bool = False | ||
| ): | ||
| r"""Initialize the base data generation pipeline. | ||
|
|
||
| Args: | ||
| output_path (Optional[str]): Path to save generated data. | ||
| If None, results will only be returned without saving to file. | ||
| (default: :obj:`None`) | ||
| batch_size (Optional[int]): Batch size for processing data. | ||
| (default: :obj:`None`) | ||
| max_workers (Optional[int]): Maximum number of worker threads. | ||
| (default: :obj:`None`) | ||
| save_intermediate (bool): Whether to save intermediate results. | ||
| (default: :obj:`False`) | ||
| """ | ||
| self.output_path = output_path | ||
| self.batch_size = batch_size | ||
| self.max_workers = max_workers | ||
| self.save_intermediate = save_intermediate | ||
|
|
||
| def load_data_from_file(self, file_path: str) -> List[Dict[str, Any]]: | ||
| r"""Load data from a JSONL file. | ||
|
|
||
| Args: | ||
| file_path (str): Path to the JSONL file. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: List of data entries. | ||
|
|
||
| Raises: | ||
| FileNotFoundError: If the file does not exist. | ||
| """ | ||
| if not os.path.exists(file_path): | ||
| raise FileNotFoundError(f"File not found: {file_path}") | ||
|
|
||
| data = [] | ||
| with open(file_path, 'r', encoding='utf-8') as f: | ||
| for line in f: | ||
| line = line.strip() | ||
| if line: | ||
| data.append(json.loads(line)) | ||
| return data | ||
|
|
||
| def load_data_from_jsonl_str(self, jsonl_str: str) -> List[Dict[str, Any]]: | ||
| r"""Load data from a JSONL string. | ||
|
|
||
| Args: | ||
| jsonl_str (str): JSONL formatted string. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: List of data entries. | ||
| """ | ||
| data = [] | ||
| for line in jsonl_str.splitlines(): | ||
| line = line.strip() | ||
| if line: | ||
| data.append(json.loads(line)) | ||
| return data | ||
|
|
||
| def load_data( | ||
| self, data: Union[str, List[Dict[str, Any]]] | ||
| ) -> List[Dict[str, Any]]: | ||
| r"""Unified method for loading data from various formats. | ||
|
|
||
| This method accepts: | ||
| - File path to a JSONL file | ||
| - JSONL string | ||
| - List of dictionaries | ||
|
|
||
| Args: | ||
| data (Union[str, List[Dict[str, Any]]]): Data input which can be | ||
| either a file path, JSONL string, or list of dictionaries. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: Loaded data as list of dictionaries. | ||
|
|
||
| Raises: | ||
| ValueError: If the data format is invalid or unsupported. | ||
| """ | ||
| if isinstance(data, list): | ||
| return data | ||
|
|
||
| if not isinstance(data, str): | ||
| raise ValueError( | ||
| "Data must be either a file path, JSONL string, " | ||
| "or list of dictionaries" | ||
| ) | ||
|
|
||
| # Check if it's a file path | ||
| if os.path.exists(data): | ||
| return self.load_data_from_file(data) | ||
|
|
||
| # Try to parse as JSONL string | ||
| try: | ||
| return self.load_data_from_jsonl_str(data) | ||
| except json.JSONDecodeError: | ||
| raise ValueError( | ||
| "Data string could not be parsed as JSONL. " | ||
| "Ensure it's a valid JSONL format." | ||
| ) | ||
|
|
||
| def save_results( | ||
| self, | ||
| results: List[Dict[str, Any]], | ||
| output_path: Optional[str] = None, | ||
| results_key: str = "results" | ||
| ) -> None: | ||
| r"""Save results to a JSON file. | ||
|
|
||
| Args: | ||
| results (List[Dict[str, Any]]): Results to save. | ||
| output_path (Optional[str]): Path to save results. | ||
| If None, uses the pipeline's output_path. | ||
| (default: :obj:`None`) | ||
| results_key (str): The key under which to store the results in the JSON file. | ||
| (default: :obj:`"results"`) | ||
|
|
||
| Raises: | ||
| ValueError: If no output path is provided. | ||
| """ | ||
| path = output_path or self.output_path | ||
| if not path: | ||
| raise ValueError( | ||
| "No output path provided. Either set output_path during " | ||
| "initialization or provide it to save_results." | ||
| ) | ||
|
|
||
| # Ensure the directory exists | ||
| os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) | ||
|
|
||
| with open(path, 'w', encoding='utf-8') as f: | ||
| json.dump({results_key: results}, f, indent=2, ensure_ascii=False) | ||
| logger.info(f"Results saved to {path} under key '{results_key}'") | ||
|
|
||
| def save_jsonl( | ||
| self, | ||
| results: List[Dict[str, Any]], | ||
| output_path: Optional[str] = None | ||
| ) -> None: | ||
| r"""Save results to a JSONL file. | ||
|
|
||
| Args: | ||
| results (List[Dict[str, Any]]): Results to save. | ||
| output_path (Optional[str]): Path to save results. | ||
| If None, uses the pipeline's output_path. | ||
| (default: :obj:`None`) | ||
|
|
||
| Raises: | ||
| ValueError: If no output path is provided. | ||
| """ | ||
| path = output_path or self.output_path | ||
| if not path: | ||
| raise ValueError( | ||
| "No output path provided. Either set output_path during " | ||
| "initialization or provide it to save_jsonl." | ||
| ) | ||
|
|
||
| # Ensure the directory exists | ||
| os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) | ||
|
|
||
| with open(path, 'w', encoding='utf-8') as f: | ||
| for item in results: | ||
| f.write(json.dumps(item, ensure_ascii=False) + '\n') | ||
| logger.info(f"Results saved to {path}") | ||
|
|
||
| def safe_write_jsonl( | ||
| self, | ||
| results: List[Dict[str, Any]], | ||
| output_path: Optional[str] = None | ||
| ) -> None: | ||
| r"""Safely write results to a JSONL file using atomic operations. | ||
|
|
||
| Args: | ||
| results (List[Dict[str, Any]]): Results to save. | ||
| output_path (Optional[str]): Path to save results. | ||
| If None, uses the pipeline's output_path. | ||
| (default: :obj:`None`) | ||
|
|
||
| Raises: | ||
| ValueError: If no output path is provided. | ||
| """ | ||
| path = output_path or self.output_path | ||
| if not path: | ||
| raise ValueError( | ||
| "No output path provided. Either set output_path during " | ||
| "initialization or provide it to safe_write_jsonl." | ||
| ) | ||
|
|
||
| # Ensure the directory exists | ||
| os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) | ||
|
|
||
| # Write to temporary file first | ||
| temp_path = path + ".tmp" | ||
| with open(temp_path, 'w', encoding='utf-8') as f: | ||
| for item in results: | ||
| f.write(json.dumps(item, ensure_ascii=False) + '\n') | ||
|
|
||
| # Replace the original file | ||
| os.replace(temp_path, path) | ||
| logger.info(f"Results safely saved to {path}") | ||
|
|
||
| def execute(self, *args, **kwargs) -> List[Dict[str, Any]]: | ||
| r"""Execute the data generation pipeline. | ||
|
|
||
| This is the primary method to run the pipeline. It calls the generate method | ||
| and handles the result processing. By default, it just calls generate, | ||
| but subclasses can override it to add pre/post-processing steps. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: Generated data. | ||
| """ | ||
| results = self.generate(*args, **kwargs) | ||
|
|
||
| # Save results if output_path is specified | ||
| if self.output_path: | ||
| self.save_results(results) | ||
|
|
||
| return results | ||
|
|
||
|
||
| def generate(self, *args, **kwargs) -> List[Dict[str, Any]]: | ||
| r"""Generate data based on the pipeline's implementation. | ||
|
|
||
| Subclasses should implement this method to define their specific | ||
| data generation logic. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: Generated data. | ||
| """ | ||
| raise NotImplementedError( | ||
| "Subclasses must implement the generate method" | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.