Skip to content

Conversation

LLLLxmmm
Copy link

@LLLLxmmm LLLLxmmm commented Sep 28, 2025

What does this PR do?

Add concise overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review.

Checklist Before Starting

  • Search for similar PRs. Paste at least one query link here: ...
  • Format the PR title as [{modules}] {type}: {description} (This will be checked by the CI)
    • {modules} include fsdp, megatron, sglang, vllm, rollout, trainer, ci, training_utils, recipe, hardware, deployment, ray, worker, single_controller, misc, perf, model, algo, env, tool, ckpt, doc, data
    • If this PR involves multiple modules, separate them with , like [megatron, fsdp, doc]
    • {type} is in feat, fix, refactor, chore, test
    • If this PR breaks any API (CLI arguments, config, function signature, etc.), add [BREAKING] to the beginning of the title.
    • Example: [BREAKING][fsdp, megatron] feat: dynamic batching

Test

For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc.

API and Usage Example

Demonstrate how the API changes if any, and provide usage example(s) if possible.

# Add code snippet or script demonstrating how to use this

Design & Code Changes

Demonstrate the high-level design if this PR is complex, and list the specific changes.

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

@LLLLxmmm LLLLxmmm changed the title Add TransferQueue recipe to veRL [recipe] feat: Integrate TransferQueue into RayTrainer Sep 28, 2025

def _initialize_data_system(self):
num_n_samples = self.config.actor_rollout_ref.rollout.n
# 1. 初始化TransferQueueStorage
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: 注释修改成英文

)
)
log_rollout_meta.reorder(balanced_idx)
self._log_rollout_data(log_rollout_meta, reward_extra_infos_dict, timing_raw, rollout_data_dir)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: step结束后clear metadata

@0oshowero0 0oshowero0 requested a review from Copilot September 29, 2025 01:05
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR integrates the TransferQueue data system into RayTrainer to enable distributed data management for PPO training. The integration replaces the traditional DataProto-based data handling with BatchMeta objects that serve as metadata references to data stored in the TransferQueue system.

  • Adds TransferQueue system initialization and client setup for distributed data storage and management
  • Updates data handling throughout the training pipeline to work with BatchMeta metadata objects
  • Extends decorator functions to support BatchMeta type checking and operations alongside existing DataProto support

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
verl/single_controller/base/decorator.py Extended type checking and operations to support BatchMeta objects in data processing functions
recipe/transfer_queue/run_qwen3-8b_transferqueue_npu.sh Added NPU-specific training script with TransferQueue configuration parameters
recipe/transfer_queue/ray_trainer.py New comprehensive RayTrainer implementation with full TransferQueue integration for distributed PPO training
recipe/transfer_queue/main_ppo.py Main entry point for PPO training with TransferQueue support and worker initialization
recipe/transfer_queue/config/transfer_queue_ppo_trainer.yaml Hydra configuration file extending the base PPO trainer config

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +364 to +394
# 1. 初始化TransferQueueStorage
total_storage_size = self.config.data.train_batch_size * self.config.trainer.num_global_batch * num_n_samples
self.data_system_storage_units = {}
storage_placement_group = get_placement_group(self.config.trainer.num_data_storage_units, num_cpus_per_actor=1)
for storage_unit_rank in range(self.config.trainer.num_data_storage_units):
# TransferQueueStorage通过Ray拉起,是一个ray.remote修饰的类
storage_node = TransferQueueStorageSimpleUnit.options(
placement_group=storage_placement_group, placement_group_bundle_index=storage_unit_rank
).remote(storage_size=math.ceil(total_storage_size / self.config.trainer.num_data_storage_units))
self.data_system_storage_units[storage_unit_rank] = storage_node
logging.info(f"TransferQueueStorageSimpleUnit #{storage_unit_rank} has been created.")

# 2. 初始化TransferQueueController
# 这里支持多controller实例以实现负载均衡,支持大规模扩展。不同controller可分配至不同RL计算任务
self.data_system_controllers = {}
controller_placement_group = get_placement_group(self.config.trainer.num_data_controllers, num_cpus_per_actor=1)
for controller_rank in range(self.config.trainer.num_data_controllers):
self.data_system_controllers[controller_rank] = TransferQueueController.options(
placement_group=controller_placement_group, placement_group_bundle_index=controller_rank
).remote(
num_storage_units=self.config.trainer.num_data_storage_units,
global_batch_size=self.config.data.train_batch_size,
num_global_batch=self.config.trainer.num_global_batch,
num_n_samples=num_n_samples,
)
logging.info(f"TransferQueueController #{controller_rank} has been created.")

# 3. 将Controller注册至各个Storage
# 每个Storage Unit拿到所有Controller的handler,通过Ray拿到对应的IP+端口,之后建立ZMQ Socket进行消息传输
self.data_system_controller_infos = process_zmq_server_info(self.data_system_controllers)
self.data_system_storage_unit_infos = process_zmq_server_info(self.data_system_storage_units)
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comments are in Chinese. Consider translating them to English for consistency with the rest of the codebase, which uses English comments.

Copilot uses AI. Check for mistakes.

self.data_system_client = AsyncTransferQueueClient(
client_id="Trainer",
controller_infos=self.data_system_controller_infos[0],
# TODO: 主控Client感知所有controller,WorkerGroup和Worker的Client感知一个controller
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The TODO comment is in Chinese. Consider translating to English: '# TODO: Main controller Client should be aware of all controllers, WorkerGroup and Worker Clients should be aware of one controller'

Suggested change
# TODO: 主控Client感知所有controller,WorkerGroup和Worker的Client感知一个controller
# TODO: Main controller Client should be aware of all controllers, WorkerGroup and Worker Clients should be aware of one controller

Copilot uses AI. Check for mistakes.

Comment on lines +1178 to +1197
# if self.reward_fn is None:
# raise ValueError("A reward_fn is required for REMAX advantage estimation.")
#
# with marked_timer("gen_max", timing_raw, color="purple"):
# gen_baseline_meta = deepcopy(gen_meta)
# gen_baseline_meta.extra_info["do_sample"] = False
# if not self.async_rollout_mode:
# gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_meta)
# else:
# gen_baseline_output = self.async_rollout_manager.generate_sequences(gen_baseline_meta)
# batch = batch.union(gen_baseline_output)
# reward_baseline_tensor = self.reward_fn(batch)
# reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)
#
# batch.pop(batch_keys=list(gen_baseline_output.batch.keys()))
#
# batch.batch["reward_baselines"] = reward_baseline_tensor
#
# del gen_baseline_batch, gen_baseline_output

Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Large commented-out code block should be removed or properly implemented. If this functionality is needed, consider implementing it properly or moving it to a separate issue for future development.

Suggested change
# if self.reward_fn is None:
# raise ValueError("A reward_fn is required for REMAX advantage estimation.")
#
# with marked_timer("gen_max", timing_raw, color="purple"):
# gen_baseline_meta = deepcopy(gen_meta)
# gen_baseline_meta.extra_info["do_sample"] = False
# if not self.async_rollout_mode:
# gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_meta)
# else:
# gen_baseline_output = self.async_rollout_manager.generate_sequences(gen_baseline_meta)
# batch = batch.union(gen_baseline_output)
# reward_baseline_tensor = self.reward_fn(batch)
# reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)
#
# batch.pop(batch_keys=list(gen_baseline_output.batch.keys()))
#
# batch.batch["reward_baselines"] = reward_baseline_tensor
#
# del gen_baseline_batch, gen_baseline_output
# (Commented-out block removed)

Copilot uses AI. Check for mistakes.

Comment on lines +1245 to +1249
# TODO: (transferqueue) (zjj)
if self.config.reward_model.launch_reward_fn_async:
future_reward = compute_reward_async.remote(data=batch, reward_fn=self.reward_fn)
else:
reward_tensor, reward_extra_infos_dict = compute_reward(batch, self.reward_fn)
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO comment with initials should be either resolved or made more descriptive. Consider creating a proper issue or implementing the missing functionality.

Copilot uses AI. Check for mistakes.

Comment on lines +1356 to +1361
# TODO: (transferqueue) (zjj)
if self.config.algorithm.use_kl_in_reward:
batch, kl_metrics = apply_kl_penalty(
batch, kl_ctrl=self.kl_ctrl_in_reward, kl_penalty=self.config.algorithm.kl_penalty
)
metrics.update(kl_metrics)
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO comment with initials should be either resolved or made more descriptive. Consider creating a proper issue or implementing the missing functionality.

Copilot uses AI. Check for mistakes.

Comment on lines +1392 to +1401
# TODO: (transferqueue) (zjj)
batch = compute_advantage(
batch,
adv_estimator=self.config.algorithm.adv_estimator,
gamma=self.config.algorithm.gamma,
lam=self.config.algorithm.lam,
num_repeat=self.config.actor_rollout_ref.rollout.n,
norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo,
config=self.config.algorithm,
)
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO comment with initials should be either resolved or made more descriptive. Consider creating a proper issue or implementing the missing functionality.

Copilot uses AI. Check for mistakes.

@0oshowero0 0oshowero0 merged commit acbd595 into TransferQueue:main_tq_submodule Sep 29, 2025
3 of 4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants