-
Notifications
You must be signed in to change notification settings - Fork 26
feat(dcp): dcp optimized s3reader for 2x faster and partial DCP loading #378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
daef051 to
39853e4
Compare
39853e4 to
08a815a
Compare
08a815a to
68165e6
Compare
s3torchconnector/src/s3torchconnector/s3reader/dcp_optimized.py
Outdated
Show resolved
Hide resolved
s3torchconnector/src/s3torchconnector/s3reader/dcp_optimized.py
Outdated
Show resolved
Hide resolved
s3torchconnector/src/s3torchconnector/s3reader/dcp_optimized.py
Outdated
Show resolved
Hide resolved
| ) | ||
|
|
||
| if not isinstance(constructor, partial): | ||
| if isinstance(constructor, DCPOptimizedConstructor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here - this feels pretty janky to me. What's this used for? Just debugging or to actually do something based on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User agent - agree this still feels janky.
s3torchconnector/src/s3torchconnector/s3reader/dcp_optimized.py
Outdated
Show resolved
Hide resolved
s3torchconnector/src/s3torchconnector/s3reader/dcp_optimized.py
Outdated
Show resolved
Hide resolved
s3torchconnector/src/s3torchconnector/s3reader/dcp_optimized.py
Outdated
Show resolved
Hide resolved
- Update SequentialS3Reader to support partial reads (and added logs) - New ListOfRangesS3Reader - Coalesces ranges to form chunks of ranges - Manages ranged SequentialS3Reader instances for each chunk - Maps each read / readinto / seek request to each s3reader instance - Integrate this reader into S3StorageReader (force ListOfRangesS3Reader for now) via S3ReaderConstructor params for list of ranges.
Add DCPListOfRangesConstructor and dcp_list_of_ranges() factory method to enable DCP range optimization through reader_constructor parameter. Includes better range injection logic and support for both direct ListOfRanges usage and DCP optimization. Users can now opt-in via: reader_constructor=S3ReaderConstructor.dcp_list_of_ranges()
- type annotations, missing arguments / return statements, etc - minor logic/name changes in list_of_ranges.py - very minor change to fix mypy error on test_user_agent.py
This commit improves performance of ListOfRangesS3Reader by up to 30% for DCP load: - Remove dependency on SequentialS3Reader for self-managed streams - Implement direct stream management with per-group buffering - Optimize read() method with no BytesIO buffer assuming sequential reading - We now enforce non-seekable behaviour to force sequential reading patterns This implementation is now significantly faster for distributed checkpoint loading patterns while maintaining correctness for sequential access. This relies on load ordering optimisation which enforces sequential reading with read() operations, but will not work with readinto() operations since those still have backward seek patterns.
- Add READER_TYPE_STRING_TO_CLASS to tst/conftest.py with dcp_optimized - Remove test_s3dataset_common.py variable, and update other references - Update e2e test to use dcp_reader_constructor fixture to include range-based readers (albeit not optimized for dcp workloads) - Add missing __init__.py files to make relative imports work
… handling
- Add comprehensive input validation for bucket, key, ranges, and max_gap_size
- Support Union[int, float] type for max_gap_size parameter to allow float("inf") (in constructor.py too)
- Filter zero-length ranges automatically during initialization (separate from validate/coalesce method)
- Improve range validation with distinct error messages for unsorted vs overlapping ranges
- Rewrite error handling with descriptive messages using consistent error prefix
- Change NotImplementedError to ValueError for size validation consistency
- Remove TODO comments:
- Check if memoryview every time for safety
- Unsorted ranges check is added
- Keep validation check in dcp_optimized to keep all dcp_optimized reader logic together
- Handling large offsets in _ItemViewBuffer could increase overhead; keep as local offsets for simplicity
- Add new unit test file with 5 test classes DCPOptimizedS3Reader functionality - TestItemViewBuffer: zero-copy buffer operations - TestCreationAndValidation: dcp_optimized reader creation and parameter validation - TestValidateAndCoalesceRanges: range coalescing logic and validations - TestStreamManagement: stream management usage verification - TestReaderIO: public interface and sequential access enforcement - Add edge case testing for float max_gap_size support in constructor tests
- Fix relative imports in e2e test files to use proper package paths - Add type ignore comment for spy function in dcp optimized tests Resolves import errors introduced when adding __init__.py files to make test directories Python packages (for READER_TYPE_STRING_TO_CLASS) changes).
9f9e3e1 to
3b3c17d
Compare
Add e2e integration test for DCPOptimizedS3Reader range coalescing behaviour with full and partial loading patterns and different max_gap_sizes.
0a8f48c to
4e9cb1f
Compare
Reverts non-DCP optimized reader changes to make the PR changes clearer: - Revert fix(tests): resolve e2e test import errors after adding __init__ files - Revert test: place READER_TYPE_STRING_TO_CLASS in conftest - Revert a minor test escape sequence fix.
567b077 to
81829d3
Compare
- Add documentation to README, constructor, and DCPOptimizedS3Reader class - Include class docstrings for S3FileSystem, S3StorageWriter, and S3StorageReader - Update reader configurations in README with examples - Use sphinx-friendly formatting for docstrings - Remove some unplanned TODOs and update some comments
81829d3 to
6d63631
Compare
c833505 to
b4ee380
Compare
- Restructure chunk processing into skip/take dedicated phases for clarity - Will reduce unneeded if checks throughout the loop - but increase verbosity with ~10 lines repeated in chunk processing - Also update wrong comment from "Iterate through remaining items" to "Check next item"
| "black", | ||
| "mypy" | ||
| "mypy", | ||
| "importlib_metadata; python_version == '3.9'", # PyTorch 2.7.0+ DCP w/ Python 3.9 requires this module; for dcp_optimized reader unit tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only for 3.9? Is this the earliest Python version we support now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - it was a PyTorch 2.7.0+ regression which required this package for Python 3.9.
Error for Python 3.9: ModuleNotFoundError: No module named 'importlib_metadata'
I haven't found the issue but has added conditional import.
Question: The problem I had with this was whether we can really add importlib_metadata to test extra without adding it to our real dependencies. Current approach works because only DCP-related tests require this import, for PyTorch 2.7.0+, and Python 3.9; but I'm not 100% confident. I did wrap all DCP imports with if TYPE_CHECKING which resolves this issue, but is not tested if it works.
| super().__init__(path) | ||
| self.fs = S3FileSystem(region, s3client_config=s3client_config, reader_constructor=reader_constructor) # type: ignore | ||
| self._reader_constructor = reader_constructor or S3ReaderConstructor.default() | ||
| self.fs: S3FileSystem = S3FileSystem( # type: ignore[assignment] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a lint ignore here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type ignore was in original code. mypy errors:
# with type hint - self.fs: S3FileSystem = S3FileSystem(...
s3torchconnector/src/s3torchconnector/dcp/s3_file_system.py:351: error: Incompatible types in assignment (expression has type "S3FileSystem", base class "FileSystemReader" defined the type as "FileSystem") [assignment]
# without type hint - self.fs = S3FileSystem(...
s3torchconnector/src/s3torchconnector/dcp/s3_file_system.py:351: error: Incompatible types in assignment (expression has type "S3FileSystem", variable has type "FileSystem") [assignment]
Reference code has file-wide mypy ignore-errors.
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
| DEFAULT_MAX_GAP_SIZE = 32 * 1024 * 1024 # TODO tune this default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO left in code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call.
This value should be raw loading throughput (around 2500MB/s) multiplied by first byte latency (around 200ms), and be around 512MB.
Docs are also lacking the max_gap_size parameter since I expect most users would use the default value - but need a solid default value first.
| ) -> None: | ||
|
|
||
| if not plan_items: | ||
| return # Allow lack of plan_items, for SequentialS3Reader fallbacks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? Did we decide what we wanted to do in case this method was called multiple times?
| ) | ||
|
|
||
| def __call__(self, bucket: str, key: str, get_object_info, get_stream) -> S3Reader: | ||
| for relative_path in self._item_ranges_by_file.keys(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for .keys() call
|
|
||
| # Otherwise, we're still in same group - reuse stream created when reading 1st item | ||
| if self._stream is None: | ||
| raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually a problem or does it just not come up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't come up; the None check is partly for mypy lint, and partly for the extremely rare case self._stream somehow gets deallocated.
| self._stream = self._get_stream(group.start, group.end) | ||
| self._stream_pos = group.start | ||
| self._leftover = None | ||
| return self._stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel there should be a subclass/dataclass just for handling the attributes on the stream
| ) | ||
| return self._stream | ||
|
|
||
| def _get_item_buffer(self, item: ItemRange) -> _ItemViewBuffer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can refactor self._stream to be it's own class, I think we can make this more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! Will have a look.
| access across DCP items (sequential item access required). | ||
| Args: | ||
| size (int | None): how many bytes to read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - that was because I wanted to allow attempts to read(None) or read(-1) (full file read attempts) to pass the read() call but give them a descriptive ValueError("Size cannot be None; full read not supported") error later on.
|
|
||
| item = self._find_item_for_position(self._position) | ||
|
|
||
| if item is not self._current_item or self._current_item_buffer is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels sketchy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._current_item_buffer is None is mainly for first item loaded.
Logic is 'if item has been changed, then load new item to buffer and read from it' - can add comment.
Description
DCPOptimizedS3Readerprovides up to 2x performance improvement for PyTorch Distributed Checkpoint (DCP) loading through three key optimizations:_ItemViewBufferusing memoryview segments to eliminate BytesIO copies and allocation overhead (~ -30% time),This reader can double DCP loading performance, and even more when loading parts of the checkpoint. (Performance boost varies with different checkpoints).
Usage:
Additional context
Changes Made:
New
DCPOptimizedS3Reader_ItemViewBuffer.ReadItem; this is enforced via Load Ordering PR.DCP Integration: S3StorageReader automatically injects range metadata from DCP load plans when providing
dcp_optimized()as reader_constructor, throughS3StorageReader.prepare_local_plan()method.Update unit/integration tests and documentation to cover the new reader
I have updated the CHANGELOG or README if appropriate
Related items
Testing
Unit / integration tests and benchmarks on Llama models.
By submitting this pull request, I confirm that my contribution is made under the terms of BSD 3-Clause License and I agree to the terms of the LICENSE.