-
Notifications
You must be signed in to change notification settings - Fork 4
feat: Support conversion between dataproto and batchmeta #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support conversion between dataproto and batchmeta #24
Conversation
d802f0c
into
TransferQueue:main_tq_submodule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements conversion utilities between BatchMeta and DataProto objects to enable data transfer through the TransferQueue system. The changes replace direct server info management with a client-based approach and introduce automatic data transformation.
- Introduces conversion functions between BatchMeta and DataProto formats
- Replaces server info management with transfer queue client creation
- Adds automatic conversion pipeline through a decorator that integrates with the register decorator
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
verl/utils/transferqueue_utils.py | Implements core conversion functions and client management utilities |
verl/single_controller/base/worker.py | Removes deprecated transfer queue server info method |
verl/single_controller/base/decorator.py | Integrates automatic conversion pipeline into register decorator |
recipe/transfer_queue/ray_trainer.py | Updates to use new client creation approach |
recipe/transfer_queue/megatron_workers.py | New worker implementations with transfer queue client support |
recipe/transfer_queue/fsdp_workers.py | New worker implementations with transfer queue client support |
recipe/transfer_queue/main_ppo.py | Updates worker imports to use transfer queue versions |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
if data.batch is not None: | ||
result_dict.update(data.batch) | ||
|
||
batch_size = data.batch.batch_size if data.batch is not None else (len(list(data.non_tensor_batch.values())[0]),) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is trailing whitespace at the end of line 85 that should be removed.
Copilot uses AI. Check for mistakes.
if data.batch is not None: | ||
result_dict.update(data.batch) | ||
|
||
batch_size = data.batch.batch_size if data.batch is not None else (len(list(data.non_tensor_batch.values())[0]),) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line will raise an IndexError if data.non_tensor_batch
is empty. Should check if data.non_tensor_batch
exists and has values before accessing [0]
.
batch_size = data.batch.batch_size if data.batch is not None else (len(list(data.non_tensor_batch.values())[0]),) | |
if data.batch is not None: | |
batch_size = data.batch.batch_size | |
elif data.non_tensor_batch is not None and len(data.non_tensor_batch) > 0: | |
first_value = next(iter(data.non_tensor_batch.values())) | |
if hasattr(first_value, '__len__') and len(first_value) > 0: | |
batch_size = (len(first_value),) | |
else: | |
batch_size = (0,) | |
else: | |
batch_size = (0,) |
Copilot uses AI. Check for mistakes.
storage_infos=storage_infos, | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is unnecessary trailing whitespace after line 32 that should be removed.
Copilot uses AI. Check for mistakes.
controller_infos=controller_infos, | ||
storage_infos=storage_infos, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is unnecessary trailing whitespace after line 52 that should be removed.
Copilot uses AI. Check for mistakes.
What does this PR do?
Checklist Before Starting
[{modules}] {type}: {description}
(This will be checked by the CI){modules}
includefsdp
,megatron
,sglang
,vllm
,rollout
,trainer
,ci
,training_utils
,recipe
,hardware
,deployment
,ray
,worker
,single_controller
,misc
,perf
,model
,algo
,env
,tool
,ckpt
,doc
,data
,
like[megatron, fsdp, doc]
{type}
is infeat
,fix
,refactor
,chore
,test
[BREAKING]
to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batching
Test
API and Usage Example
# Add code snippet or script demonstrating how to use this
Design & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always
ci-request
channel in theverl
Slack workspace. (If not accessible, please try the Feishu group (飞书群).)