Skip to content

Verl 常见参数配置和理解

📅 发表于 2025/10/31
🔄 更新于 2025/10/31
👁️ -- 次访问
📝 0 字
0 分钟
verl
#verl
#max_new_tokens
#rollout.response_length
#rollout.prompt_length
#rollout.response_length
#data.train_batch_size
#actor.ppo_mini_batch_size
#actor.ppo_micro_batch_size_per_gpu
#rollout.log_prob_micro_batch_size_per_gpu
#ref.log_prob_micro_batch_size_per_gpu
#model_parallel_size
#tensor_model_parallel_size
#pipeline_model_parallel_size
#num_attention_heads
#num_layers
#max_token_len_per_gpu
#sglang
#Yarn
#rope_scaling
#recompute_granularity
#recompute_modules
#model_merger

关键参数理解

每回合生成最大token数

max_new_tokens

参考文章

最大模型长度 (Yarn配置见下文)

  • rollout.max_model_len = rollout.prompt_length + rollout.response_length
    • 由rollout的prompt+response长度决定

最大解码token数

  • 以下2者取小值
    • 最大回复长度配置rollout.response_length
    • 当前Prompt可用长度max_model_len - len(prompt_ids) - 1,
python
@ray.remote(num_cpus=1)
class SGLangHttpServer:
    def __init__(
        self,
        config: RolloutConfig,
        model_config: HFModelConfig,
        rollout_mode: RolloutMode,
        workers: list[ActorHandle],
        replica_rank: int,
        node_rank: int,
        nnodes: int,
        cuda_visible_devices: str,
    ):
        print(f"SGLang http server: {rollout_mode=}, {replica_rank=}, {node_rank=}, {nnodes=}, {cuda_visible_devices=}")
        os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices
        assert torch.cuda.is_available(), "SGLang http server should run on GPU node"

        self.config: RolloutConfig = omega_conf_to_dataclass(config)
        self.model_config: HFModelConfig = omega_conf_to_dataclass(model_config, dataclass_type=HFModelConfig)
        self.config.max_model_len = self.config.prompt_length + self.config.response_length 
        self.rollout_mode = rollout_mode
        self.workers = workers

        self.replica_rank = replica_rank
        self.node_rank = node_rank
        self.nnodes = nnodes
        # ....
    
    async def generate(
        self,
        prompt_ids: torch.Tensor,
        sampling_params: dict[str, Any],
        request_id: str,
        image_data: Optional[list[Any]] = None,
    ) -> TokenOutput:
        """Generate sequence with token-in-token-out."""
        max_new_tokens = min(self.config.response_length, self.config.max_model_len - len(prompt_ids) - 1) 
        sampling_params["max_new_tokens"] = max_new_tokens
        return_logprob = sampling_params.pop("logprobs", False)

        request = GenerateReqInput(
            rid=request_id,
            input_ids=prompt_ids,
            sampling_params=sampling_params,
            return_logprob=return_logprob,
            image_data=image_data,
        )
        output = await self.tokenizer_manager.generate_request(request, None).__anext__()
        if return_logprob:
            output_token_logprobs = output["meta_info"]["output_token_logprobs"]
            log_probs, token_ids = zip(
                *[(log_prob, token_ids) for log_prob, token_ids, _ in output_token_logprobs], strict=True
            )
        else:
            token_ids = output["output_ids"]
            log_probs = None
        return TokenOutput(token_ids=token_ids, log_probs=log_probs)

batch_size相关

简单总结

batch_size 总结
  • data.train_batch_size
    • 训练batch大小,乘以rollout.n,得到real_train_batch_size
  • actor.ppo_mini_batch_size
    • 更新模型时,对real_train_batch划分为多个mini_batch
    • 1个real_train_batch 更新多次1个mini_batch 更新1次
  • actor.ppo_micro_batch_size_per_gpu
    • 更新模型时,对mini_batch划分为多个micro_batch,依次计算,梯度累积
    • micro_batch_size=ppo_micro_batch_size_per_gpu
  • rollout.log_prob_micro_batch_size_per_gpu
    • 计算old_log_prob时,对real_train_batch划分为多个micro_batch,依次计算
  • ref.log_prob_micro_batch_size_per_gpu
    • 计算ref_log_prob时,对real_train_batch划分为多个micro_batch,依次计算

data.train_batch_size

data.train_batch_size

含义

  • train_batch_size:训练集batch大小,用于数据迭代加载训练
  • val_batch_size:同理, 不再赘述

关键点

  • 与rollout.n结合,构建出real_train_batch_size
    • real_train_batch_size = train_batch_size * rollout.n

参考笔记

加载训练数据,以train_batch_size遍历

python
self.train_dataloader = StatefulDataLoader( 
    dataset=self.train_dataset, 
    batch_size=self.config.data.get("gen_batch_size", self.config.data.train_batch_size),  
    num_workers=num_workers,
    drop_last=True,
    collate_fn=collate_fn,
    sampler=train_sampler,
)

后续所有rollout、update等,都是基于batch

python
for epoch in range(current_epoch, self.config.trainer.total_epochs):  
    for batch_dict in self.train_dataloader:   
      # ...
      # ...
      gen_batch = self._get_gen_batch(batch)
      # pass global_steps to trace
      gen_batch.meta_info["global_steps"] = self.global_steps
      gen_batch_output = gen_batch.repeat( 
          repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True
      ) 
      # 其他操作...
      # ....

actor.ppo_mini_batch_size

actor.ppo_mini_batch_size

前处理

含义

  • update actor(或critic)时,对当前real_train_batch进行划分

    • 划分为多个mini_batch,大小为 ppo_mini_batch_size
    • 每个mini_batch更新一次模型
    • 即在一个real_train_batch内,分批次更新多次模型
  • real_train_batch_size -> ppo_mini_batch_size

  • 通常与ppo_epochs结合使用,即重复使用几次。

  • Critic同理

    • 参数为critic.ppo_mini_batch_size

参考笔记

ActorRolloutRefWorker.update_actor,update_actor 主入口笔记

python
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor"))
@GPUMemoryLogger(role="update_actor", logger=logger)
@DistProfiler.annotate(color="red")
def update_actor(self, data: DataProto):
    # ...
    micro_batch_size = self.config.actor.ppo_micro_batch_size_per_gpu
    data.meta_info["micro_batch_size"] = micro_batch_size
    dataloader = self.actor.make_minibatch_iterator(data=data)   
    with Timer(name="update_policy", logger=None) as timer:
        metrics = self.actor.update_policy(dataloader=dataloader)  
    # ...

MegatronPPOActor.update_policy,update_policy 笔记

python
@GPUMemoryLogger(role="megatron actor", logger=logger)
def update_policy(self, dataloader: Iterable[DataProto]) -> dict:
    for data in dataloader:  
      	#在一个real_train_batch内,更新多次模型
      	# ...
        self.actor_optimizer.zero_grad()
        calculate_entropy = self.config.entropy_coeff != 0
        micro_batch_size = self.config.ppo_micro_batch_size_per_gpu
        max_token_len = None
        if self.config.use_dynamic_bsz:
            max_token_len = self.config.ppo_max_token_len_per_gpu * self.config.megatron.context_parallel_size 
        # ....
        # 调用forward_backward_batch
        metric_micro_batch = self.forward_backward_batch(   
            data, 
            calculate_entropy=calculate_entropy, 
            use_dynamic_bsz=self.config.use_dynamic_bsz, 
            micro_batch_size=micro_batch_size, 
            max_token_len=max_token_len, 
            mini_batch_size=self.config.ppo_mini_batch_size, 
        ) 
        # ...
        # 更新模型
        update_successful, grad_norm, num_zeros_in_grad = self.actor_optimizer.step()  
        # ...

CriticWorker.update_critic,CriticWorker.update_critic 笔记

python
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="critic"))
@DistProfiler.annotate(color="pink")
def update_critic(self, data: DataProto):
    data = data.to(get_device_id())
    # ...
    dataloader = self.critic.make_minibatch_iterator(data)   
    with Timer(name="update_critic", logger=None) as timer:
        metrics = self.critic.update_critic(dataloader=dataloader) 
    # ...

MegatronPPOCritic.update_critic 同理,PPOCritic.update_critic 笔记

python
@GPUMemoryLogger("megatron critic", logger=logger)
def update_critic(self, dataloader: Iterable[DataProto]):
    metrics = {}
    for data in dataloader:   
        self.critic_optimizer.zero_grad()
        micro_batch_size = self.config.ppo_micro_batch_size_per_gpu 
        max_token_len = None
        if self.config.use_dynamic_bsz:
            max_token_len = self.config.ppo_max_token_len_per_gpu * self.config.megatron.context_parallel_size
        # 最核心部分
        metric_micro_batch = self.forward_backward_batch(  
            data,
            forward_only=False,
            use_dynamic_bsz=self.config.use_dynamic_bsz,
            micro_batch_size=micro_batch_size, 
            max_token_len=max_token_len,
            mini_batch_size=self.config.ppo_mini_batch_size, 
        )
        metric_micro_batch = metric_micro_batch["output"]
        update_successful, grad_norm, num_zeros_in_grad = self.critic_optimizer.step()  
 				# ...

MegatronPPOActor.make_minibatch_iterator:make_mini_batch 代码笔记

python
def make_minibatch_iterator(self, data: DataProto) -> Iterable[DataProto]:
    select_keys = [
        "responses",
        "input_ids",
        "attention_mask",
        "response_mask",
        "position_ids",
        "old_log_probs",
        "advantages",
    ]
    # ...
    # ...
    data = data.select(batch_keys=select_keys)
    return data.make_iterator(
        mini_batch_size=self.config.ppo_mini_batch_size, 
        epochs=self.config.ppo_epochs, 
        seed=self.config.data_loader_seed,
        dataloader_kwargs={"shuffle": self.config.shuffle},
    )

actor.ppo_micro_batch_size_per_gpu (梯度累积)

actor.ppo_micro_batch_size_per_gpu (梯度累积)

背景

  • 一次前向反向更新会OOM,因此拆分成多步。
  • 梯度累积:计算出梯度,但不清零多步累积,最后一次性更新。

含义

  • update actor(或critic)时,对当前mini_batch进行划分
    • 划分为多个micro-batches
    • micro_batch_size = ppo_micro_batch_size_per_gpu
    • 本质就是梯度累积步数
  • real_train_batch_size -> ppo_mini_batch_size -> ppo_micro_batch_size_per_gpu
  • Critic同理
    • 参数为critic.ppo_mini_batch_size_per_gpu

以下3者类似

  • rollout.log_prob_micro_batch_size_per_gpu (计算old_log_prob)
  • ref.log_prob_micro_batch_size_per_gpu (计算ref_log_prob)
  • actor.ppo_micro_batch_size_per_gpu (更新模型, 对mini-batch做划分)

ActorRolloutRefWorker.update_actor,update_actor 主入口笔记

python
def update_actor(self, data: DataProto):
    # ...
    micro_batch_size = self.config.actor.ppo_micro_batch_size_per_gpu 
    data.meta_info["micro_batch_size"] = micro_batch_size  
    dataloader = self.actor.make_minibatch_iterator(data=data)
    with Timer(name="update_policy", logger=None) as timer:
        metrics = self.actor.update_policy(dataloader=dataloader)  
    # ...

MegatronPPOActor.update_policy,update_policy 笔记

python
@GPUMemoryLogger(role="megatron actor", logger=logger)
def update_policy(self, dataloader: Iterable[DataProto]) -> dict:
    metrics = {}
    for data in dataloader: 
        self.actor_optimizer.zero_grad()
				# ...
        calculate_entropy = self.config.entropy_coeff != 0
        if data.meta_info.get("micro_batch_size", None) is not None:
            micro_batch_size = data.meta_info["micro_batch_size"]  
        else:
            micro_batch_size = self.config.ppo_micro_batch_size_per_gpu
        # 调用forward_backward_batch,具体内容见上文
        metric_micro_batch = self.forward_backward_batch(  
            data, 
            calculate_entropy=calculate_entropy, 
            use_dynamic_bsz=self.config.use_dynamic_bsz, 
            micro_batch_size=micro_batch_size,  
            max_token_len=max_token_len, 
            mini_batch_size=self.config.ppo_mini_batch_size, 
        ) 
        # 统计loss等其他指标
        # ...
        # 更新模型
        update_successful, grad_norm, num_zeros_in_grad = self.actor_optimizer.step() 
        data = {"actor/grad_norm": grad_norm}
        # ....

MegatronPPOActor.forward_backward_batch,forward_backward_batch 笔记

python
def forward_backward_batch(
        self,
        data: DataProto,
        forward_only=False,
        use_dynamic_bsz=False,
        micro_batch_size=None,
        max_token_len=None,
        mini_batch_size=None,
    ):
    # broadcast from last pp rank to all other pp ranks
    # ...
    mini_batch = data
    mini_batch.batch = mini_batch.batch.contiguous()
    # ...
    if use_dynamic_bsz:
        # pass
    else:
        micro_batches = mini_batch.batch.split(micro_batch_size)  
        seq_len = micro_batches[0]["input_ids"].shape[1] 
        total_seqlen = micro_batch_size * seq_len
    
    n_micro_batch = len(micro_batches) 
    forward_backward_func = get_forward_backward_func()
    # batch should be a list of batches inside micro-batches
    batch_generator = make_batch_generator(micro_batches, vpp_size=len(self.critic_module))  

    # TODO: we may use the new schedule instead
    # for flash-attn: (seq_len, batch_size, hidden_size) = (mbs*seq_len, 1, hidden_size)
    if mpu.get_pipeline_model_parallel_world_size() > 1:
        losses_reduced = forward_backward_func(
            forward_step_func=forward_step,
            data_iterator=batch_generator,  
            model=self.critic_module,
            num_microbatches=n_micro_batch,
            seq_length=total_seqlen,  # no use when input_shapes was set
            micro_batch_size=1,  # no use when input_shapes was set
            forward_only=forward_only,
        )
    else:
        losses_reduced = forward_backward_func( 
            forward_step_func=forward_step, 
            data_iterator=batch_generator,  
            model=self.critic_module, 
            num_microbatches=n_micro_batch, 
            seq_length=total_seqlen,  # in use for pp = 1 #
            micro_batch_size=1,  # in use for pp = 1 #
            forward_only=forward_only, 
        ) 
    # loss_reduces contains the stats returned from loss_func
    losses_reduced = {"output": losses_reduced} 
    if use_dynamic_bsz:
        losses_reduced["indices"] = indices
    return losses_reduced

下一步具体见笔记

rollout.log_prob_micro_batch_size_per_gpu

rollout.log_prob_micro_batch_size_per_gpu

含义

  • rollout期间,计算log_prob时
    • real_train_batch划分为多个micro-batches
    • micro_batch_size = log_prob_micro_batch_size_per_gpu
  • real_train_batch_size -> log_prob_micro_batch_size_per_gpu
  • log_prob_micro_batch_size 通常可以设置得比 ppo_micro_batch_size 大很多

以下3者类似

  • rollout.log_prob_micro_batch_size_per_gpu (计算old_log_prob)
  • ref.log_prob_micro_batch_size_per_gpu (计算ref_log_prob)
  • actor.ppo_micro_batch_size_per_gpu (更新模型, 对mini-batch做划分)

ActorRolloutRefWorker.compute_log_prob,compute_log_prob 笔记

python
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor"))
@GPUMemoryLogger(role="compute_log_prob", logger=logger)
@DistProfiler.annotate(color="blue")
def compute_log_prob(self, data: DataProto):
    data.meta_info["micro_batch_size"] = self.config.rollout.log_prob_micro_batch_size_per_gpu   
    # ...
    # 核心部分,调用megatron_actor计算
    output, entropys = self.actor.compute_log_prob(data=data, calculate_entropy=True)  
    # 返回格式
    output = DataProto.from_dict( 
        tensors={"old_log_probs": output, "entropys": entropys}, 
        meta_info={"temperature": self.config.rollout.temperature}, 
    ) 
    # ...
    return output
python
def forward_backward_batch(
        self,
        data: DataProto,
        forward_only=False,
        use_dynamic_bsz=False,
        micro_batch_size=None,
        max_token_len=None,
        mini_batch_size=None,
    ):
    # broadcast from last pp rank to all other pp ranks
    # ...
    mini_batch = data
    mini_batch.batch = mini_batch.batch.contiguous()
    # ...
    if use_dynamic_bsz:
        # pass
    else:
        micro_batches = mini_batch.batch.split(micro_batch_size)  
        seq_len = micro_batches[0]["input_ids"].shape[1] 
        total_seqlen = micro_batch_size * seq_len
    
    n_micro_batch = len(micro_batches) 
    forward_backward_func = get_forward_backward_func()
    # batch should be a list of batches inside micro-batches
    batch_generator = make_batch_generator(micro_batches, vpp_size=len(self.critic_module))  

    # TODO: we may use the new schedule instead
    # for flash-attn: (seq_len, batch_size, hidden_size) = (mbs*seq_len, 1, hidden_size)
    if mpu.get_pipeline_model_parallel_world_size() > 1:
        losses_reduced = forward_backward_func(
            forward_step_func=forward_step,
            data_iterator=batch_generator,  
            model=self.critic_module,
            num_microbatches=n_micro_batch,
            seq_length=total_seqlen,  # no use when input_shapes was set
            micro_batch_size=1,  # no use when input_shapes was set
            forward_only=forward_only,
        )
    else:
        losses_reduced = forward_backward_func( 
            forward_step_func=forward_step, 
            data_iterator=batch_generator,  
            model=self.critic_module, 
            num_microbatches=n_micro_batch, 
            seq_length=total_seqlen,  # in use for pp = 1 #
            micro_batch_size=1,  # in use for pp = 1 #
            forward_only=forward_only, 
        ) 
    # loss_reduces contains the stats returned from loss_func
    losses_reduced = {"output": losses_reduced} 
    if use_dynamic_bsz:
        losses_reduced["indices"] = indices
    return losses_reduced

ref.log_prob_micro_batch_size_per_gpu

ref.log_prob_micro_batch_size_per_gpu

含义

  • rollout期间,计算ref_log_prob时
    • real_train_batch划分为多个micro-batches
    • micro_batch_size = log_prob_micro_batch_size_per_gpu
  • real_train_batch_size -> log_prob_micro_batch_size_per_gpu

以下3者类似

  • rollout.log_prob_micro_batch_size_per_gpu (计算old_log_prob)
  • ref.log_prob_micro_batch_size_per_gpu (计算ref_log_prob)
  • actor.ppo_micro_batch_size_per_gpu (更新模型, 对mini-batch做划分)

关键参数约束

参数计算

相关并行参数计算
  • n_gpus = n_gpus_per_node * nnodes
  • model_parallel_size = tensor_model_parallel_size * pipeline_model_parallel_size
  • megatron_dp = n_gpus // (model_parallel_size * context_parallel_size )
  • minimal_bsz = megatron_dp * actor.ppo_micro_batch_size_per_gpu
  • real_train_batch_size = data.train_batch_size * rollout.n

参数约束条件

部分参数约束条件

实际train_bs被mini_bs整除

  • real_train_batch_size % minimal_bsz == 0

gpu数量被parrallel_size整除

  • n_gpus % model_parallel_size * context_parallel_size == 0 ,非常重要!!

TP参数

  • num_attention_heads % tensor_model_parallel_size == 0

PP参数

  • num_layers % pipeline_model_parallel_size == 0

EP参数

  • num_moe_experts % moe_router_num_groups == 0

CP参数

  • n_gpus % model_parallel_size * context_parallel_size == 0 ,非常重要!!

max_token_len_per_gpu

max_token_len_per_gpu

含义

  • 顾名思义,每个gpu生成的最大token长度。

作用

  • 在forward_backward_batch里,计算total_seqlen
    • total_seqlen = micro_batch_size * seq_len
  • 实际这个不会生效,因为定好了shape做好了pad,forward_backward_batch 笔记

actor.ppo_max_token_len_per_gpu

bash
# Max tokens per GPU in one PPO batch; affects gradient accumulation
# Typically it should be: n * ${data.max_prompt_length} + ${data.max_response_length}
# oc.select: the default val for ref.log_prob_max_token_len_per_gpu
ppo_max_token_len_per_gpu: 16384

ref.log_prob_max_token_len_per_gpu

bash
# the max token length per GPU
# same as actor_rollout_ref.actor.ppo_max_token_len_per_gpu if it exists, otherwise 16384
log_prob_max_token_len_per_gpu: ${oc.select:actor_rollout_ref.actor.ppo_max_token_len_per_gpu,16384}

rollout.log_prob_max_token_len_per_gpu

bash
# max token length for log_prob computation
# same as actor_rollout_ref.actor.ppo_max_token_len_per_gpu if it exists, otherwise 16384
log_prob_max_token_len_per_gpu: ${oc.select:actor_rollout_ref.actor.ppo_max_token_len_per_gpu,16384}

常见参数配置

SGLang+Yarn 配置

环境变量

  • 如果环境变量未生效,可尝试ray stop后,再试试。
bash
export SGLANG_ALLOW_OVERWRITE_LONGER_CONTEXT_LEN=1
+ray_kwargs.ray_init.runtime_env.SGLANG_ALLOW_OVERWRITE_LONGER_CONTEXT_LEN=1 \

模型配置

  • 常规rope配置
bash
+actor_rollout_ref.model.override_config.rope_scaling.type=yarn \
+actor_rollout_ref.model.override_config.rope_scaling.factor=1.5 \
+actor_rollout_ref.model.override_config.rope_scaling.original_max_position_embeddings=32768 \

SGLang 配置

  • context_length
bash
export SGLANG_OVERRIDE_ARGS='{"rope_scaling": {"type": "yarn", "factor": 1.5, "original_max_position_embeddings": 32768}}'
+actor_rollout_ref.rollout.engine_kwargs.sglang.context_length=49152
+actor_rollout_ref.rollout.engine_kwargs.sglang.json_model_override_args="'${SGLANG_OVERRIDE_ARGS}'"

Megatron 重计算配置

具体见笔记:Transformer 显存关键参数-重计算

bash
recompute_granularity=selective
recompute_modules='["core_attn", "mlp"]'
actor_rollout_ref.actor.megatron.override_transformer_config.recompute_granularity="${recompute_granularity}"
actor_rollout_ref.actor.megatron.override_transformer_config.recompute_modules="${recompute_modules}"

模型合并

官方文档:verl Using Checkpoints to Support Fault Tolerance Training

模型合并
  • 切记不要像官网一样使用--tie-word-embedding这个参数,这会导致不存储lm_head.weight,这样会导致转换成hfmodel部署时,预测会出错,乱码、不停止。
  • 保存后的tokenizer_config.json,没有chat_template这个参数,原基模是有的,可以复制粘贴过来。

合并脚本如下:

bash
local_dir="xxx"
target_dir="xxx"

python -m verl.model_merger merge \
    --backend megatron \
    --local_dir ${local_dir} \
    --target_dir ${target_dir} \

模型预测

部署成本地服务,参考sglang官方文档

bash
DEFAULT_SAMPLING_PARAMS='{"temperature": 0, "top_p": 1}'
model_path="xxx/hfmodel"
model_name="hmodel2"
python -m sglang.launch_server \
 --model-path ${model_path} \
 --host "127.0.0.1" \
 --port 6380 \
 --tensor-parallel-size 8 \
 --served-model-name ${model_name} \
 --preferred-sampling-params "${DEFAULT_SAMPLING_PARAMS}" \
总访客数:   ·   总访问量:
PLM's Blog @ 2016 - 2025