Skip to content

Conversation

0oshowero0
Copy link

What does this PR do?

This PR introduces the TransferQueue data management module to verl, aiming to accelerate experience data transfer and address performance bottlenecks in post-training systems. Detailed design rationale is available in our RFC (#2662).

This PR adds TransferQueue as a git submodule into verl/experimental/transfer_queue. Besides, we provide end-to-end scripts that integrate verl with TransferQueue.

TransferQueue is a high-performance data storage and transfer module with panoramic data visibility and streaming scheduling capabilities, optimized for efficient dataflow in post-training workflows (in progress).

The system will introduce the following core components:

  • TransferQueueClient: Deployed on each Worker, manages the communication with TransferQueue system via simple put/get semantics.

  • TransferQueueController: Centralized dataflow scheduler tracking the production and consumption status of training samples.

  • TransferQueueStorage: Distributed storage units that holds the actual experience data.

The primary motivation for integrating TransferQueue to verl now is to alleviate the data transfer bottleneck of the single controller RayPPOTrainer. Currently, all DataProto objects must be routed through RayPPOTrainer, resulting in a single point bottleneck of the whole post-training system.

verl_dataflow_DataProto

Leveraging TransferQueue, we separate experience data transfer from metadata dispatch by

  • Replacing DataProto with BatchMeta (metadata) and TensorDict (actual data) structures
  • Preserving verl's original Dispatch/Collect logic via BatchMeta (maintaining single-controller debuggability)
  • Accelerating data transfer by TransferQueue's distributed storage units

verl_dataflow_TransferQueue

For WorkerGroup class, we hide the above translation process by decorator. For AgentLoop related class, we explicitely do the adaption in AgentLoopBase.

Checklist Before Starting

  • Search for similar PRs. Paste at least one query link here: ...
  • Format the PR title as [{modules}] {type}: {description} (This will be checked by the CI)
    • {modules} include fsdp, megatron, sglang, vllm, rollout, trainer, ci, training_utils, recipe, hardware, deployment, ray, worker, single_controller, misc, perf, model, algo, env, tool, ckpt, doc, data
    • If this PR involves multiple modules, separate them with , like [megatron, fsdp, doc]
    • {type} is in feat, fix, refactor, chore, test
    • If this PR breaks any API (CLI arguments, config, function signature, etc.), add [BREAKING] to the beginning of the title.
    • Example: [BREAKING][fsdp, megatron] feat: dynamic batching

Test

We've validated TransferQueue functionality through

  • Unit test of (Async)TransferQueueClient, TransferQueueController, and TransferQueueSimpleUnit
  • End-to-end demo that mimics the usage in verl

API and Usage Example

The primary interaction points are AsyncTransferQueueClient and TransferQueueClient, serving as the communication interface with the TransferQueue system.

Core client interfaces:

  • (async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta
  • (async_)get_data(metadata:BatchMeta) -> TensorDict
  • (async_)put(data:TensorDict, metadata:BatchMeta, global_step)
  • (async_)clear(global_step: int)

You may refer to the example here, where we mimics the verl usage in both async & sync scenarios:
https://github.com/TransferQueue/TransferQueue/tree/dev/recipe/simple_use_case.


For verl integration, we put TransferQueue as a submodule in verl/experimental/transfer_queue. So please run the following git command first:

git submodule init
git submodule update

Then you can try our recipe (still in developing).

Design & Code Changes

Refer to our Paper, RFC, and Zhihu post :)

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

FightingZhen and others added 29 commits September 23, 2025 19:56
* Support controller in TransferQueue

* Fix import

* Fix comments

---------

Co-authored-by: liuximeng <[email protected]>
Added copyright and licensing information to the controller.py file.
* update client docstring

Signed-off-by: 0oshowero0 <[email protected]>

* fix n_sample related problems

Signed-off-by: 0oshowero0 <[email protected]>

---------

Signed-off-by: 0oshowero0 <[email protected]>
* Add metadata.py and test_simple_storage_unit.py

* Add copyright and license information to test_simple_storage_unit.py

* Apply suggestion from @Copilot

Co-authored-by: Copilot <[email protected]>

---------

Co-authored-by: Han Zhenyu 韩振宇 <[email protected]>
Co-authored-by: Copilot <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
* Origin recipe

* Integrate TransferQueue with Ray Trainer

* Fix codecheck

* Fix codecheck

* Fix codecheck

* Fix codecheck

* Fix

* Fix codecheck

---------

Co-authored-by: liuximeng <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
Signed-off-by: 0oshowero0 <[email protected]>
* fix chinese comments & add TODO

* provide general DataProto<->BatchMeta decorator

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

* optimize code

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

* fix

Signed-off-by: 0oshowero0 <[email protected]>

---------

Signed-off-by: 0oshowero0 <[email protected]>
@0oshowero0
Copy link
Author

Current Progress:

  • BatchMeta can be successfully dispatched to each Worker
  • BatchMeta <-> DataProto bidirectional conversion is ready, tested by generate_sequence when async_rollout_mode=False
  • Support all WorkerGroup classes
  • Support AgentLoopManager class (async_rollout_mode=True)
  • RayPPOTrainer single controller adaption
  • E2E long run

@ji-huazhong ji-huazhong force-pushed the main_tq_submodule branch 11 times, most recently from 65affa2 to eb31070 Compare September 30, 2025 07:55
ji-huazhong and others added 2 commits September 30, 2025 16:48
* feat: Support conversion between dataproto and batchmeta

* update
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants