Skip to content

Add exclude parameter to S3 pull functionality #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/art/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from art.utils.s3 import (
pull_model_from_s3,
push_model_to_s3,
ExcludableOption,
)


Expand Down Expand Up @@ -443,6 +444,7 @@ async def _experimental_pull_from_s3(
prefix: str | None = None,
verbose: bool = False,
delete: bool = False,
exclude: list[ExcludableOption] | None = None,
) -> None:
"""Download the model directory from S3 into local Backend storage. Right now this can be used to pull trajectory logs for processing or model checkpoints.
Args:
Expand All @@ -452,7 +454,9 @@ async def _experimental_pull_from_s3(
prefix: The prefix to pull from S3. If None, the model name will be used.
verbose: Whether to print verbose output.
delete: Whether to delete the local model directory.
exclude: List of directories to exclude from sync. Valid options: "checkpoints", "logs", "trajectories".
"""

await pull_model_from_s3(
model_name=model.name,
project=model.project,
Expand All @@ -462,6 +466,7 @@ async def _experimental_pull_from_s3(
verbose=verbose,
delete=delete,
art_path=self._path,
exclude=exclude,
)

async def _experimental_push_to_s3(
Expand Down
1 change: 1 addition & 0 deletions src/art/utils/benchmarking/load_trajectories.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ async def pull_model_trajectories(model: ArtModel) -> None:
model,
s3_bucket=bucket,
verbose=True,
exclude=["checkpoints", "logs"],
)

print("Finished pulling trajectories.", flush=True)
16 changes: 14 additions & 2 deletions src/art/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
from asyncio.subprocess import DEVNULL
import tempfile
from typing import Optional, Sequence
from typing import Optional, Sequence, Literal
import zipfile

from art.errors import ForbiddenBucketCreationError
Expand All @@ -17,6 +17,8 @@

__all__: Sequence[str] = ("s3_sync",)

ExcludableOption = Literal["checkpoints", "logs", "trajectories"]


class S3SyncError(RuntimeError):
"""Raised when the underlying *aws s3 sync* command exits with a non‑zero status."""
Expand Down Expand Up @@ -67,6 +69,7 @@ async def s3_sync(
profile: Optional[str] = None,
verbose: bool = False,
delete: bool = False,
exclude: list[ExcludableOption] | None = None,
) -> None:
"""Synchronise *source* and *destination* using the AWS CLI.

Expand All @@ -82,6 +85,7 @@ async def s3_sync(
profile: Optional AWS profile name to pass to the CLI.
verbose: When *True*, the output of the AWS CLI is streamed to the
calling process; otherwise it is suppressed.
exclude: List of directories to exclude from sync.

Raises:
S3SyncError: If the *aws s3 sync* command exits with a non‑zero status.
Expand All @@ -100,6 +104,12 @@ async def s3_sync(

if delete:
cmd.append("--delete")

# Add exclude patterns for each excluded directory
if exclude:
for excluded_dir in exclude:
cmd.extend(["--exclude", f"{excluded_dir}/*"])

cmd += [source, destination]

# Suppress output unless verbose mode is requested.
Expand Down Expand Up @@ -156,6 +166,7 @@ async def pull_model_from_s3(
verbose: bool = False,
delete: bool = False,
art_path: str | None = None,
exclude: list[ExcludableOption] | None = None,
) -> str:
"""Pull a model from S3 to the local directory.

Expand All @@ -169,6 +180,7 @@ async def pull_model_from_s3(
calling process; otherwise it is suppressed.
delete: When *True*, delete the local model directory if it exists.
art_path: The path to the ART directory.
exclude: List of directories to exclude from sync.

Returns:
The local directory path.
Expand Down Expand Up @@ -196,7 +208,7 @@ async def pull_model_from_s3(
prefix=prefix,
)
await ensure_bucket_exists(s3_bucket)
await s3_sync(s3_path, local_dir, verbose=verbose, delete=delete)
await s3_sync(s3_path, local_dir, verbose=verbose, delete=delete, exclude=exclude)

# After pulling, migrate to new structure if needed
if step is not None:
Expand Down