Skip to content
Open
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
cf00afa
Added dynamic filter
PrinsYin Jul 26, 2025
d800627
Addedrunner
PrinsYin Jul 26, 2025
9bb815f
dynamic filter
PrinsYin Jul 26, 2025
f5ffc02
dynamic filter no kl
PrinsYin Jul 26, 2025
20a822c
extra info logic update
PrinsYin Jul 26, 2025
78b44f7
logging and run script
PrinsYin Jul 27, 2025
ecf1c21
added metric train reward
PrinsYin Jul 28, 2025
72eb9a9
1
PrinsYin Aug 7, 2025
82fb1ae
1
PrinsYin Aug 7, 2025
1ce98dd
1
PrinsYin Aug 7, 2025
df7f92d
Merge mainline
Hecate0821 Aug 9, 2025
c33d341
Add script for Qwen3-4b dapo
Hecate0821 Aug 9, 2025
e57fb25
Clean Up
Hecate0821 Aug 9, 2025
9644809
Fix Pre-commit
Hecate0821 Aug 9, 2025
8a4ba8f
Fix: Comments
Hecate0821 Aug 12, 2025
a2bfbbf
Fix:naming
Hecate0821 Aug 17, 2025
e539fde
merge main
Hecate0821 Aug 17, 2025
2ac9fcb
Merge branch 'main' into ds_nokl
zhaochenyang20 Aug 25, 2025
8442353
Add filter for all negative and positive
PrinsYin Aug 11, 2025
a527b77
Fix pre-commit
Hecate0821 Aug 25, 2025
d26af35
Fix script
Hecate0821 Aug 25, 2025
d4eefac
Fix script
Hecate0821 Aug 25, 2025
55d87ad
Add config in megatron yaml
Hecate0821 Aug 25, 2025
4aa7087
Fix config
Hecate0821 Aug 25, 2025
ddfdd8f
Fix pre-commit
PrinsYin Aug 25, 2025
3f478c9
Fix CI
PrinsYin Aug 25, 2025
235c12d
1
PrinsYin Aug 26, 2025
09d60cb
1
PrinsYin Aug 26, 2025
4f46906
1
PrinsYin Aug 26, 2025
7df117b
refactor
PrinsYin Aug 26, 2025
a8692b2
refactor
PrinsYin Aug 26, 2025
1ae4dbe
refactor
PrinsYin Aug 26, 2025
cd14168
refactor
PrinsYin Aug 26, 2025
19c4fd3
Fix script
Hecate0821 Aug 26, 2025
6c96fe6
script
PrinsYin Aug 27, 2025
7ff23e9
script
PrinsYin Aug 27, 2025
a9ae9e6
Make filter a class
Hecate0821 Aug 27, 2025
703f783
Fix config
Hecate0821 Aug 27, 2025
d4aeb94
Merge branch 'main' into ds_nokl
zhaochenyang20 Aug 27, 2025
09a58a5
moved filter
PrinsYin Aug 28, 2025
d45d5b6
Fix traienr
Hecate0821 Aug 28, 2025
fc935ff
fixed improt
PrinsYin Aug 28, 2025
1adbf67
Fix add back init
Hecate0821 Aug 28, 2025
c582114
Fix naming
Hecate0821 Aug 28, 2025
ef493a5
Fix precommit
Hecate0821 Aug 28, 2025
5977e2a
Update verl/trainer/ppo/ray_trainer.py
Hecate0821 Aug 29, 2025
f65470b
Update verl/trainer/ppo/ray_trainer.py
Hecate0821 Aug 29, 2025
0ffa3bc
Fix: Centrelize settings
Hecate0821 Aug 29, 2025
1e9f49f
Merge branch 'ds_nokl' of https://github.com/PrinsYin/verl into ds_nokl
Hecate0821 Aug 29, 2025
23bdc68
Fix: concentrate the reward logic
Hecate0821 Aug 30, 2025
64ae27f
Fix: make _extract_reward_extra_infos utility function
Hecate0821 Aug 30, 2025
8a72b98
Fix: extract utility function
Hecate0821 Aug 30, 2025
f1e1de2
Fix: Clean
Hecate0821 Aug 30, 2025
9af93ef
Fix: Clean
Hecate0821 Aug 30, 2025
40cd018
Fix: Add credit
Hecate0821 Aug 30, 2025
fbb5020
Fix: Clean
Hecate0821 Aug 30, 2025
d371c97
Fix: Pre-commit
Hecate0821 Aug 30, 2025
ed1520d
Fix: response mask
Hecate0821 Aug 30, 2025
cdc4eb0
Fix: Licence
Hecate0821 Aug 31, 2025
aca70ba
1
PrinsYin Aug 31, 2025
42ac13a
1
PrinsYin Aug 31, 2025
7409e30
fix
PrinsYin Aug 31, 2025
36fc97b
Update verl/trainer/ppo/ray_trainer.py
PrinsYin Aug 31, 2025
da4ab8b
resolve comment
PrinsYin Aug 31, 2025
cc34f16
fix extra info
PrinsYin Aug 31, 2025
08f4c04
fix extra info
PrinsYin Aug 31, 2025
3ab4752
Fix pre-commit
Hecate0821 Aug 31, 2025
31887c4
Fix pre-commit
Hecate0821 Aug 31, 2025
2115f70
Merge branch 'main' into ds_nokl
zhaochenyang20 Sep 3, 2025
8c74805
Fix naming
Hecate0821 Sep 4, 2025
cd5093b
Fix comments
Hecate0821 Sep 10, 2025
c5778c1
Fix repeat traj bug and zip issue
Hecate0821 Sep 12, 2025
809d37a
Fix redundant if
Hecate0821 Sep 12, 2025
1f928e4
1
PrinsYin Sep 12, 2025
e603155
1
PrinsYin Sep 12, 2025
c1ca9c1
1
PrinsYin Sep 12, 2025
8346efb
Merge branch 'volcengine:main' into ds_nokl
zhaochenyang20 Sep 17, 2025
9617d09
Merge branch 'main' into ds_nokl
zhaochenyang20 Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/sglang_multiturn/run_qwen3_4b_dapo_multiturn.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ python3 -m verl.trainer.main_ppo \
actor_rollout_ref.rollout.val_kwargs.top_p=0.6 \
actor_rollout_ref.rollout.val_kwargs.temperature=1.0 \
actor_rollout_ref.rollout.val_kwargs.n=30 \
algorithm.filter_groups.enable=True \
trainer.logger=['console','wandb'] \
trainer.project_name=sglang-dapo-multiturn \
trainer.experiment_name=qwen3_4b_sft_dapo_multiturn \
Expand Down
13 changes: 10 additions & 3 deletions verl/trainer/config/_generated_ppo_megatron_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,14 @@ reward_model:
override_transformer_config: ${oc.select:actor_rollout_ref.actor.megatron.override_transformer_config,{}}
use_mbridge: ${oc.select:actor_rollout_ref.actor.megatron.use_mbridge,False}
load_weight: true
custom_reward_function:
path: null
name: compute_score
algorithm:
filter_groups:
_target_: verl.trainer.config.FilterGroupsConfig
enable: false
metric: seq_reward
max_num_gen_batches: 10
filter_function: verl.utils.filtering.dynamic_filtering.keep_mixed_reward
filter_kwargs: {}
_target_: verl.trainer.config.AlgoConfig
gamma: 1.0
lam: 1.0
Expand All @@ -453,6 +457,9 @@ algorithm:
pf_ppo:
reweight_method: pow
weight_pow: 2.0
custom_reward_function:
path: null
name: compute_score
trainer:
balance_batch: true
total_epochs: 30
Expand Down
13 changes: 10 additions & 3 deletions verl/trainer/config/_generated_ppo_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,14 @@ reward_model:
save_path: ${oc.select:global_profiler.save_path,null}
tool_config: ${oc.select:actor_rollout_ref.actor.profiler.tool_config,null}
ulysses_sequence_parallel_size: 1
custom_reward_function:
path: null
name: compute_score
algorithm:
filter_groups:
_target_: verl.trainer.config.FilterGroupsConfig
enable: false
metric: seq_reward
max_num_gen_batches: 10
filter_function: verl.utils.filtering.dynamic_filtering.keep_mixed_reward
filter_kwargs: {}
_target_: verl.trainer.config.AlgoConfig
gamma: 1.0
lam: 1.0
Expand All @@ -415,6 +419,9 @@ algorithm:
pf_ppo:
reweight_method: pow
weight_pow: 2.0
custom_reward_function:
path: null
name: compute_score
trainer:
balance_batch: true
total_epochs: 30
Expand Down
11 changes: 9 additions & 2 deletions verl/trainer/config/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,19 @@ class FilterGroupsConfig(BaseConfig):
Args:
enable (bool): Whether to enable filter groups.
metric (Optional[str]): Metric to use for filtering: "acc", "score", "seq_reward", "seq_final_reward", etc.
max_num_gen_batches (int): Non-positive values mean no upper limit.
max_num_gen_batches (int): Maximum number of backfill attempts when collecting diverse responses.
Non-positive values mean no upper limit (use with caution).
filter_function (Optional[str]): Path to filter function (e.g., "my_module.my_filter_func").
Required when filter_groups is enabled. For the original mixed rewards
filter, use "verl.utils.filtering.dynamic_filtering.keep_mixed_reward".
filter_kwargs (Optional[dict]): Additional arguments for the filter function.
"""

enable: bool = False
metric: Optional[str] = None
max_num_gen_batches: int = 0
filter_function: Optional[str] = "verl.utils.filtering.dynamic_filtering.keep_mixed_reward"
filter_kwargs: Optional[dict] = field(default_factory=dict)


@dataclass
Expand All @@ -72,7 +79,7 @@ class AlgoConfig(BaseConfig):
kl_ctrl (KLControlConfig): KL control configuration.
use_pf_ppo (bool): Whether to enable preference feedback PPO.
pf_ppo (dict[str, Any]): Preference feedback PPO settings.
filter_groups (Optional[FilterGroupsConfig]): Filter groups configuration, used in DAPO and Entropy
filter_groups (Optional[FilterGroupsConfig]): Dynamic filter configuration, used in DAPO and Entropy
"""

gamma: float = 1.0
Expand Down
26 changes: 26 additions & 0 deletions verl/trainer/config/algorithm/filter_groups.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Format checks enforced on CI:
# 1. Comments must appear above each field.
# 2. There must be a blank line between each field.
# 3. Inline comments (after a field on the same line) are not allowed.
# 4. Indentation level is respected for nested fields.

# Dynamic filter for DAPO: filters out homogeneous groups, keeps diverse responses

# Required when using verl.utils.omega_conf_to_dataclass to instantiate dataclass configs
_target_: verl.trainer.config.FilterGroupsConfig

# Whether to enable dynamic filter
enable: False

# Metric to use for dynamic filter: currently only "seq_reward" is supported
metric: seq_reward

# Maximum number of backfill attempts when collecting diverse responses
# If set to 0 or negative, allows unlimited backfill attempts (use with caution)
max_num_gen_batches: 10

# Default filter function for mixed reward filtering
filter_function: verl.utils.filtering.dynamic_filtering.keep_mixed_reward

# Additional arguments for the filter function
filter_kwargs: {}
2 changes: 2 additions & 0 deletions verl/trainer/config/ppo_megatron_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defaults:
- critic@critic: megatron_critic
# Reward model config.
- reward_model@reward_model: megatron_reward_model
# Algorithm filter groups config.
- algorithm/[email protected]_groups
- _self_

actor_rollout_ref:
Expand Down
3 changes: 3 additions & 0 deletions verl/trainer/config/ppo_trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ defaults:
# Reward model config.
- reward_model@reward_model: dp_reward_model

# Algorithm filter groups config.
- algorithm/[email protected]_groups

# load the reference default config, then apply the fields in the current yaml
# self config override anything above
- _self_
Expand Down
33 changes: 33 additions & 0 deletions verl/trainer/ppo/metric_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,36 @@ def process_validation_metrics(
data_src2var2metric2val[data_source][var_name][metric_name] = np.mean(uid_vals)

return data_src2var2metric2val


def compute_reward_metrics(batch: DataProto) -> dict[str, Any]:
"""
Computes reward-related metrics from a batch of data for PPO training.

This function computes metrics from the RAW batch BEFORE any dynamic filtering
is applied. When using dynamic filtering (DAPO), this captures the reward distribution
of ALL generated responses, including those that will be filtered out for being too
homogeneous. This provides insight into the raw reward signal quality before diversity
filtering removes low-variance response groups.

This function calculates statistics (mean, std, max, min) for sequence-level rewards
derived from token-level scores.

Args:
batch: A DataProto object containing batch data with token-level scores

Returns:
A dictionary of reward metrics including:
- before_filtering/reward/mean: Mean sequence reward (pre-filtering)
- before_filtering/reward/std: Standard deviation of sequence rewards (pre-filtering)
- before_filtering/reward/max: Maximum sequence reward (pre-filtering)
- before_filtering/reward/min: Minimum sequence reward (pre-filtering)
"""
seq_reward_tensor = batch.batch["token_level_scores"].sum(-1)

return {
"before_filtering/reward/mean": seq_reward_tensor.mean().detach().item(),
"before_filtering/reward/std": seq_reward_tensor.std().detach().item(),
"before_filtering/reward/max": seq_reward_tensor.max().detach().item(),
"before_filtering/reward/min": seq_reward_tensor.min().detach().item(),
}
96 changes: 75 additions & 21 deletions verl/trainer/ppo/ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,28 @@
from verl.trainer.ppo.core_algos import AdvantageEstimator, agg_loss
from verl.trainer.ppo.metric_utils import (
compute_data_metrics,
compute_reward_metrics,
compute_throughout_metrics,
compute_timing_metrics,
process_validation_metrics,
)
from verl.trainer.ppo.reward import compute_reward, compute_reward_async
from verl.trainer.ppo.utils import Role, WorkerType, need_critic, need_reference_policy, need_reward_model
from verl.trainer.ppo.reward import compute_reward, compute_reward_async, extract_reward_extra_infos
from verl.trainer.ppo.utils import (
Role,
WorkerType,
need_critic,
need_reference_policy,
need_reward_model,
)
from verl.utils.checkpoint.checkpoint_manager import find_latest_ckpt_path, should_save_ckpt_esi
from verl.utils.config import omega_conf_to_dataclass
from verl.utils.debug import marked_timer
from verl.utils.filtering.dynamic_filtering import DynamicFilter
from verl.utils.metric import reduce_metrics
from verl.utils.rollout_skip import RolloutSkip
from verl.utils.seqlen_balancing import get_seqlen_balanced_partitions, log_seqlen_unbalance
from verl.utils.torch_functional import masked_mean
from verl.utils.tracking import ValidationGenerationsLogger
from verl.utils.tracking import Tracking, ValidationGenerationsLogger


@dataclass
Expand Down Expand Up @@ -352,6 +360,12 @@ def __init__(
if self.config.algorithm.use_kl_in_reward:
self.kl_ctrl_in_reward = core_algos.get_kl_controller(self.config.algorithm.kl_ctrl)

self.dynamic_filter = (
DynamicFilter(config=self.config)
if self.config.algorithm.filter_groups and self.config.algorithm.filter_groups.enable
else None
)

self._create_dataloader(train_dataset, val_dataset, collate_fn, train_sampler)

def _create_dataloader(self, train_dataset, val_dataset, collate_fn, train_sampler: Optional[Sampler]):
Expand Down Expand Up @@ -909,15 +923,12 @@ def fit(self):
"""
from omegaconf import OmegaConf

from verl.utils.tracking import Tracking

logger = Tracking(
project_name=self.config.trainer.project_name,
experiment_name=self.config.trainer.experiment_name,
default_backend=self.config.trainer.logger,
config=OmegaConf.to_container(self.config, resolve=True),
)

self.global_steps = 0

# load checkpoint before doing anything
Expand Down Expand Up @@ -958,6 +969,10 @@ def fit(self):
metrics = {}
timing_raw = {}

# dynamic filter
if self.dynamic_filter:
self.dynamic_filter.increment_gen_batches()

with marked_timer("start_profile", timing_raw):
self._start_profiling(
not prev_step_profile and curr_step_profile
Expand Down Expand Up @@ -1017,27 +1032,56 @@ def fit(self):

if "response_mask" not in batch.batch.keys():
batch.batch["response_mask"] = compute_response_mask(batch)
# Balance the number of valid tokens across DP ranks.
# NOTE: This usually changes the order of data in the `batch`,
# which won't affect the advantage calculation (since it's based on uid),
# but might affect the loss calculation (due to the change of mini-batching).
# TODO: Decouple the DP balancing and mini-batching.
if self.config.trainer.balance_batch:
self._balance_batch(batch, metrics=metrics)

# compute global_valid tokens
batch.meta_info["global_token_num"] = torch.sum(batch.batch["attention_mask"], dim=-1).tolist()

# Compute all reward scores in one consolidated block
with marked_timer("reward", timing_raw, color="yellow"):
# compute reward model score
if self.use_rm and "rm_scores" not in batch.batch.keys():
reward_tensor = self.rm_wg.compute_rm_score(batch)
batch = batch.union(reward_tensor)

# compute reward function score
reward_extra_infos_dict = {}
if self.config.reward_model.launch_reward_fn_async:
future_reward = compute_reward_async.remote(data=batch, reward_fn=self.reward_fn)
else:
reward_tensor, reward_extra_infos_dict = compute_reward(batch, self.reward_fn)
# Set token_level_scores immediately for sync case (needed for compute_reward_metrics)
batch.batch["token_level_scores"] = reward_tensor

if reward_extra_infos_dict:
batch.non_tensor_batch.update(
{k: np.array(v) for k, v in reward_extra_infos_dict.items()}
)

# Compute reward metrics
if self.dynamic_filter and self.dynamic_filter.increment_reward_step(self.global_steps):
reward_metrics = compute_reward_metrics(batch)
metrics.update(reward_metrics)

# Apply dynamic filtering after reward computation
if self.dynamic_filter:
# Apply dynamic filtering and handle batch accumulation
processed_batch, should_continue = self.dynamic_filter.process_batch_with_filtering(
batch,
self.config,
)

if should_continue:
continue

batch = processed_batch

# Balance the number of valid tokens across DP ranks.
# NOTE: This usually changes the order of data in the `batch`,
# which won't affect the advantage calculation (since it's based on uid),
# but might affect the loss calculation (due to the change of mini-batching).
# TODO: Decouple the DP balancing and mini-batching.
if self.config.trainer.balance_batch:
self._balance_batch(batch, metrics=metrics)

# compute global_valid tokens
batch.meta_info["global_token_num"] = torch.sum(batch.batch["attention_mask"], dim=-1).tolist()

# recompute old_log_probs
with marked_timer("old_log_prob", timing_raw, color="blue"):
Expand Down Expand Up @@ -1074,14 +1118,16 @@ def fit(self):

with marked_timer("adv", timing_raw, color="brown"):
# we combine with rule-based rm
reward_extra_infos_dict: dict[str, list]
if self.config.reward_model.launch_reward_fn_async:
reward_tensor, reward_extra_infos_dict = ray.get(future_reward)
batch.batch["token_level_scores"] = reward_tensor

if reward_extra_infos_dict:
batch.non_tensor_batch.update({k: np.array(v) for k, v in reward_extra_infos_dict.items()})
# Set token_level_scores for async case
batch.batch["token_level_scores"] = reward_tensor

if reward_extra_infos_dict:
batch.non_tensor_batch.update(
{k: np.array(v) for k, v in reward_extra_infos_dict.items()}
)
# For sync case, token_level_scores and extra_infos are already set above
# compute rewards. apply_kl_penalty if available
if self.config.algorithm.use_kl_in_reward:
batch, kl_metrics = apply_kl_penalty(
Expand Down Expand Up @@ -1135,6 +1181,10 @@ def fit(self):
for item in batch
]

reward_extra_infos_dict = extract_reward_extra_infos(
batch, set(reward_extra_infos_dict.keys())
)

if "request_id" in batch.non_tensor_batch:
reward_extra_infos_dict.setdefault(
"request_id",
Expand Down Expand Up @@ -1223,6 +1273,10 @@ def fit(self):
progress_bar.update(1)
self.global_steps += 1

# Reset dynamic filter state for next training step
if self.dynamic_filter:
self.dynamic_filter.clear()

if (
hasattr(self.config.actor_rollout_ref.actor, "profiler")
and self.config.actor_rollout_ref.actor.profiler.tool == "torch_memory"
Expand Down
9 changes: 9 additions & 0 deletions verl/trainer/ppo/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,12 @@ def compute_reward_async(data: DataProto, config=None, tokenizer=None, reward_fn
)

return compute_reward(data, reward_fn)


def extract_reward_extra_infos(batch: DataProto, reward_extra_info_keys: list[str]) -> dict[str, list]:
"""Extract reward extra info from batch.non_tensor_batch for dump_generations."""
reward_extra_infos_dict = {}
for key in reward_extra_info_keys:
reward_extra_infos_dict[key] = batch.non_tensor_batch[key]

return reward_extra_infos_dict
23 changes: 23 additions & 0 deletions verl/utils/filtering/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
# Copyright 2023-2024 SGLang Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Reference:
# - DAPO: An Open-Source LLM Reinforcement Learning System at Scale
# Paper: https://arxiv.org/abs/2503.14476
# - This implementation references the ReTool implementation: recipe/retool/ in VERL codebase

from .dynamic_filtering import DynamicFilter, keep_mixed_reward

__all__ = ["DynamicFilter", "keep_mixed_reward"]
Loading