-
Notifications
You must be signed in to change notification settings - Fork 4
Support controller in TransferQueue #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new TransferQueueController class that enables controller functionality in the TransferQueue system. The controller manages data production and consumption status across distributed storage units, coordinating data flow between producers and consumers through ZMQ-based communication.
- Adds centralized controller for managing data status and metadata across storage units
- Implements ZMQ-based communication protocols for handshaking, request handling, and status updates
- Provides data sampling and batch management capabilities with support for multiple consumption modes
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
||
TQ_CONTROLLER_GET_METADATA_TIMEOUT = int(os.environ.get("TQ_CONTROLLER_GET_METADATA_TIMEOUT", 300)) | ||
TQ_CONTROLLER_GET_METADATA_CHECK_INTERVAL = int(os.environ.get("TQ_CONTROLLER_GET_METADATA_CHECK_INTERVAL", 1)) | ||
TQ_INIT_FIELD_NUM = os.environ.get("TQ_INIT_FIELD_NUM", 10) |
Copilot
AI
Sep 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Environment variable TQ_INIT_FIELD_NUM is retrieved as a string but used as an integer. This will cause a TypeError when used in tensor operations. Convert to int: int(os.environ.get('TQ_INIT_FIELD_NUM', 10))
TQ_INIT_FIELD_NUM = os.environ.get("TQ_INIT_FIELD_NUM", 10) | |
TQ_INIT_FIELD_NUM = int(os.environ.get("TQ_INIT_FIELD_NUM", 10)) |
Copilot uses AI. Check for mistakes.
elif request_msg.request_type == ZMQRequestType.NOTIFY_DATA_UPDATE_ERROR: | ||
# Handle data update errors | ||
error_msg = request_msg.body.get("message", "Unknown error") | ||
print(f"Data update error from storage: {error_msg}") |
Copilot
AI
Sep 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using print() for error logging is inconsistent with the established logging pattern used throughout the file. Replace with logger.error(f'Data update error from storage: {error_msg}')
print(f"Data update error from storage: {error_msg}") | |
logger.error(f"Data update error from storage: {error_msg}") |
Copilot uses AI. Check for mistakes.
data_fields: list[str], | ||
batch_size: int, | ||
mode: str = "fetch", | ||
global_step=0, |
Copilot
AI
Sep 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The default value of 0 for global_step parameter could be problematic as it's not clear if this is a valid default for all use cases. Consider making this a required parameter or use None to indicate it must be explicitly provided.
Copilot uses AI. Check for mistakes.
current_fields = self.data_production_status.shape[1] | ||
# Expand data status matrix if needed | ||
if len(self.field_name_mapping) + needed_fields > current_fields: | ||
add_fields = max(TQ_INIT_FIELD_NUM, needed_fields + 1) |
Copilot
AI
Sep 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TQ_INIT_FIELD_NUM is a string from environment variable but used in max() comparison with integers. This will cause a TypeError. This is a secondary issue stemming from the type conversion problem in line 39.
Copilot uses AI. Check for mistakes.
if mode == "insert": | ||
# TODO: Currently only supports putting entire GBS data, need to extend to support multiple puts to same | ||
# step | ||
assert batch_size == self.global_batch_size |
Copilot
AI
Sep 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assert statement for batch_size validation should include a descriptive error message to help with debugging: assert batch_size == self.global_batch_size, f'batch_size {batch_size} must equal global_batch_size {self.global_batch_size}'
assert batch_size == self.global_batch_size | |
assert batch_size == self.global_batch_size, f"batch_size {batch_size} must equal global_batch_size {self.global_batch_size}" |
Copilot uses AI. Check for mistakes.
* Support controller in TransferQueue * Fix import * Fix comments --------- Co-authored-by: liuximeng <[email protected]>
What does this PR do?
Checklist Before Starting
[{modules}] {type}: {description}
(This will be checked by the CI){modules}
includefsdp
,megatron
,sglang
,vllm
,rollout
,trainer
,ci
,training_utils
,recipe
,hardware
,deployment
,ray
,worker
,single_controller
,misc
,perf
,model
,algo
,env
,tool
,ckpt
,doc
,data
,
like[megatron, fsdp, doc]
{type}
is infeat
,fix
,refactor
,chore
,test
[BREAKING]
to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batching
Test
API and Usage Example
# Add code snippet or script demonstrating how to use this
Design & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always
ci-request
channel in theverl
Slack workspace. (If not accessible, please try the Feishu group (飞书群).)