关键参数理解
每回合生成最大token数
参考文章
最大模型长度 (Yarn配置见下文)
- rollout.
max_model_len= rollout.prompt_length+ rollout.response_length- 由rollout的
prompt+response长度决定
- 由rollout的
最大解码token数
- 以下2者
取小值- 最大回复长度配置:
rollout.response_length - 当前Prompt可用长度:
max_model_len-len(prompt_ids)- 1,
- 最大回复长度配置:
@ray.remote(num_cpus=1)
class SGLangHttpServer:
def __init__(
self,
config: RolloutConfig,
model_config: HFModelConfig,
rollout_mode: RolloutMode,
workers: list[ActorHandle],
replica_rank: int,
node_rank: int,
nnodes: int,
cuda_visible_devices: str,
):
print(f"SGLang http server: {rollout_mode=}, {replica_rank=}, {node_rank=}, {nnodes=}, {cuda_visible_devices=}")
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices
assert torch.cuda.is_available(), "SGLang http server should run on GPU node"
self.config: RolloutConfig = omega_conf_to_dataclass(config)
self.model_config: HFModelConfig = omega_conf_to_dataclass(model_config, dataclass_type=HFModelConfig)
self.config.max_model_len = self.config.prompt_length + self.config.response_length
self.rollout_mode = rollout_mode
self.workers = workers
self.replica_rank = replica_rank
self.node_rank = node_rank
self.nnodes = nnodes
# ....
async def generate(
self,
prompt_ids: torch.Tensor,
sampling_params: dict[str, Any],
request_id: str,
image_data: Optional[list[Any]] = None,
) -> TokenOutput:
"""Generate sequence with token-in-token-out."""
max_new_tokens = min(self.config.response_length, self.config.max_model_len - len(prompt_ids) - 1)
sampling_params["max_new_tokens"] = max_new_tokens
return_logprob = sampling_params.pop("logprobs", False)
request = GenerateReqInput(
rid=request_id,
input_ids=prompt_ids,
sampling_params=sampling_params,
return_logprob=return_logprob,
image_data=image_data,
)
output = await self.tokenizer_manager.generate_request(request, None).__anext__()
if return_logprob:
output_token_logprobs = output["meta_info"]["output_token_logprobs"]
log_probs, token_ids = zip(
*[(log_prob, token_ids) for log_prob, token_ids, _ in output_token_logprobs], strict=True
)
else:
token_ids = output["output_ids"]
log_probs = None
return TokenOutput(token_ids=token_ids, log_probs=log_probs)batch_size相关
简单总结
- data.train_batch_size
- 训练batch大小,
乘以rollout.n,得到real_train_batch_size
- 训练batch大小,
- actor.ppo_mini_batch_size
更新模型时,对real_train_batch划分为多个mini_batch1个real_train_batch更新多次;1个mini_batch更新1次
- actor.ppo_micro_batch_size_per_gpu
更新模型时,对mini_batch划分为多个micro_batch,依次计算,梯度累积- micro_batch_size=ppo_micro_batch_size_per_gpu
- rollout.log_prob_micro_batch_size_per_gpu
计算old_log_prob时,对real_train_batch划分为多个micro_batch,依次计算
- ref.log_prob_micro_batch_size_per_gpu
计算ref_log_prob时,对real_train_batch划分为多个micro_batch,依次计算
data.train_batch_size
含义
train_batch_size:训练集batch大小,用于数据迭代加载训练- val_batch_size:同理, 不再赘述
关键点
与rollout.n结合,构建出real_train_batch_size- real_train_batch_size = train_batch_size * rollout.n
参考笔记
加载训练数据,以train_batch_size遍历
self.train_dataloader = StatefulDataLoader(
dataset=self.train_dataset,
batch_size=self.config.data.get("gen_batch_size", self.config.data.train_batch_size),
num_workers=num_workers,
drop_last=True,
collate_fn=collate_fn,
sampler=train_sampler,
)后续所有rollout、update等,都是基于batch
for epoch in range(current_epoch, self.config.trainer.total_epochs):
for batch_dict in self.train_dataloader:
# ...
# ...
gen_batch = self._get_gen_batch(batch)
# pass global_steps to trace
gen_batch.meta_info["global_steps"] = self.global_steps
gen_batch_output = gen_batch.repeat(
repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True
)
# 其他操作...
# ....actor.ppo_mini_batch_size
前处理
- MegatronActorRolloutRefWorker在构造函数里,
自动放大n倍,再除以DP_world_size- 即:对
actor.ppo_mini_batch_size乘以rollout.n, - 再:除以get_data_parallel_world_size()
- MegatronActorRolloutRefWorker 构造函数笔记
- 即:对
含义
update actor(或critic)时,对当前
real_train_batch进行划分- 划分为
多个mini_batch,大小为 ppo_mini_batch_size 每个mini_batch,更新一次模型- 即在
一个real_train_batch内,分批次更新多次模型
- 划分为
real_train_batch_size->ppo_mini_batch_size通常与
ppo_epochs结合使用,即重复使用几次。Critic同理
- 参数为
critic.ppo_mini_batch_size
- 参数为
参考笔记
ActorRolloutRefWorker.update_actor,update_actor 主入口笔记
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor"))
@GPUMemoryLogger(role="update_actor", logger=logger)
@DistProfiler.annotate(color="red")
def update_actor(self, data: DataProto):
# ...
micro_batch_size = self.config.actor.ppo_micro_batch_size_per_gpu
data.meta_info["micro_batch_size"] = micro_batch_size
dataloader = self.actor.make_minibatch_iterator(data=data)
with Timer(name="update_policy", logger=None) as timer:
metrics = self.actor.update_policy(dataloader=dataloader)
# ...MegatronPPOActor.update_policy,update_policy 笔记
@GPUMemoryLogger(role="megatron actor", logger=logger)
def update_policy(self, dataloader: Iterable[DataProto]) -> dict:
for data in dataloader:
#在一个real_train_batch内,更新多次模型
# ...
self.actor_optimizer.zero_grad()
calculate_entropy = self.config.entropy_coeff != 0
micro_batch_size = self.config.ppo_micro_batch_size_per_gpu
max_token_len = None
if self.config.use_dynamic_bsz:
max_token_len = self.config.ppo_max_token_len_per_gpu * self.config.megatron.context_parallel_size
# ....
# 调用forward_backward_batch
metric_micro_batch = self.forward_backward_batch(
data,
calculate_entropy=calculate_entropy,
use_dynamic_bsz=self.config.use_dynamic_bsz,
micro_batch_size=micro_batch_size,
max_token_len=max_token_len,
mini_batch_size=self.config.ppo_mini_batch_size,
)
# ...
# 更新模型
update_successful, grad_norm, num_zeros_in_grad = self.actor_optimizer.step()
# ...CriticWorker.update_critic,CriticWorker.update_critic 笔记
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="critic"))
@DistProfiler.annotate(color="pink")
def update_critic(self, data: DataProto):
data = data.to(get_device_id())
# ...
dataloader = self.critic.make_minibatch_iterator(data)
with Timer(name="update_critic", logger=None) as timer:
metrics = self.critic.update_critic(dataloader=dataloader)
# ...MegatronPPOCritic.update_critic 同理,PPOCritic.update_critic 笔记
@GPUMemoryLogger("megatron critic", logger=logger)
def update_critic(self, dataloader: Iterable[DataProto]):
metrics = {}
for data in dataloader:
self.critic_optimizer.zero_grad()
micro_batch_size = self.config.ppo_micro_batch_size_per_gpu
max_token_len = None
if self.config.use_dynamic_bsz:
max_token_len = self.config.ppo_max_token_len_per_gpu * self.config.megatron.context_parallel_size
# 最核心部分
metric_micro_batch = self.forward_backward_batch(
data,
forward_only=False,
use_dynamic_bsz=self.config.use_dynamic_bsz,
micro_batch_size=micro_batch_size,
max_token_len=max_token_len,
mini_batch_size=self.config.ppo_mini_batch_size,
)
metric_micro_batch = metric_micro_batch["output"]
update_successful, grad_norm, num_zeros_in_grad = self.critic_optimizer.step()
# ...MegatronPPOActor.make_minibatch_iterator:make_mini_batch 代码笔记
def make_minibatch_iterator(self, data: DataProto) -> Iterable[DataProto]:
select_keys = [
"responses",
"input_ids",
"attention_mask",
"response_mask",
"position_ids",
"old_log_probs",
"advantages",
]
# ...
# ...
data = data.select(batch_keys=select_keys)
return data.make_iterator(
mini_batch_size=self.config.ppo_mini_batch_size,
epochs=self.config.ppo_epochs,
seed=self.config.data_loader_seed,
dataloader_kwargs={"shuffle": self.config.shuffle},
)actor.ppo_micro_batch_size_per_gpu (梯度累积)
背景
- 一次前向反向更新会
OOM,因此拆分成多步。 梯度累积:计算出梯度,但不清零,多步累积,最后一次性更新。
含义
- update actor(或critic)时,对当前
mini_batch进行划分- 划分为
多个micro-batches micro_batch_size=ppo_micro_batch_size_per_gpu- 本质就是
梯度累积步数
- 划分为
real_train_batch_size->ppo_mini_batch_size->ppo_micro_batch_size_per_gpu- Critic同理
- 参数为
critic.ppo_mini_batch_size_per_gpu
- 参数为
以下3者类似
- rollout.log_prob_micro_batch_size_per_gpu (计算old_log_prob)
- ref.log_prob_micro_batch_size_per_gpu (计算ref_log_prob)
- actor.ppo_micro_batch_size_per_gpu (更新模型, 对mini-batch做划分)
ActorRolloutRefWorker.update_actor,update_actor 主入口笔记
def update_actor(self, data: DataProto):
# ...
micro_batch_size = self.config.actor.ppo_micro_batch_size_per_gpu
data.meta_info["micro_batch_size"] = micro_batch_size
dataloader = self.actor.make_minibatch_iterator(data=data)
with Timer(name="update_policy", logger=None) as timer:
metrics = self.actor.update_policy(dataloader=dataloader)
# ...MegatronPPOActor.update_policy,update_policy 笔记
@GPUMemoryLogger(role="megatron actor", logger=logger)
def update_policy(self, dataloader: Iterable[DataProto]) -> dict:
metrics = {}
for data in dataloader:
self.actor_optimizer.zero_grad()
# ...
calculate_entropy = self.config.entropy_coeff != 0
if data.meta_info.get("micro_batch_size", None) is not None:
micro_batch_size = data.meta_info["micro_batch_size"]
else:
micro_batch_size = self.config.ppo_micro_batch_size_per_gpu
# 调用forward_backward_batch,具体内容见上文
metric_micro_batch = self.forward_backward_batch(
data,
calculate_entropy=calculate_entropy,
use_dynamic_bsz=self.config.use_dynamic_bsz,
micro_batch_size=micro_batch_size,
max_token_len=max_token_len,
mini_batch_size=self.config.ppo_mini_batch_size,
)
# 统计loss等其他指标
# ...
# 更新模型
update_successful, grad_norm, num_zeros_in_grad = self.actor_optimizer.step()
data = {"actor/grad_norm": grad_norm}
# ....MegatronPPOActor.forward_backward_batch,forward_backward_batch 笔记
def forward_backward_batch(
self,
data: DataProto,
forward_only=False,
use_dynamic_bsz=False,
micro_batch_size=None,
max_token_len=None,
mini_batch_size=None,
):
# broadcast from last pp rank to all other pp ranks
# ...
mini_batch = data
mini_batch.batch = mini_batch.batch.contiguous()
# ...
if use_dynamic_bsz:
# pass
else:
micro_batches = mini_batch.batch.split(micro_batch_size)
seq_len = micro_batches[0]["input_ids"].shape[1]
total_seqlen = micro_batch_size * seq_len
n_micro_batch = len(micro_batches)
forward_backward_func = get_forward_backward_func()
# batch should be a list of batches inside micro-batches
batch_generator = make_batch_generator(micro_batches, vpp_size=len(self.critic_module))
# TODO: we may use the new schedule instead
# for flash-attn: (seq_len, batch_size, hidden_size) = (mbs*seq_len, 1, hidden_size)
if mpu.get_pipeline_model_parallel_world_size() > 1:
losses_reduced = forward_backward_func(
forward_step_func=forward_step,
data_iterator=batch_generator,
model=self.critic_module,
num_microbatches=n_micro_batch,
seq_length=total_seqlen, # no use when input_shapes was set
micro_batch_size=1, # no use when input_shapes was set
forward_only=forward_only,
)
else:
losses_reduced = forward_backward_func(
forward_step_func=forward_step,
data_iterator=batch_generator,
model=self.critic_module,
num_microbatches=n_micro_batch,
seq_length=total_seqlen, # in use for pp = 1 #
micro_batch_size=1, # in use for pp = 1 #
forward_only=forward_only,
)
# loss_reduces contains the stats returned from loss_func
losses_reduced = {"output": losses_reduced}
if use_dynamic_bsz:
losses_reduced["indices"] = indices
return losses_reduced下一步具体见笔记
rollout.log_prob_micro_batch_size_per_gpu
含义
- rollout期间,计算log_prob时
- 把
real_train_batch划分为多个micro-batches micro_batch_size=log_prob_micro_batch_size_per_gpu
- 把
real_train_batch_size->log_prob_micro_batch_size_per_gpu- log_prob_micro_batch_size 通常可以设置得比
ppo_micro_batch_size大很多
以下3者类似
- rollout.log_prob_micro_batch_size_per_gpu (计算old_log_prob)
- ref.log_prob_micro_batch_size_per_gpu (计算ref_log_prob)
- actor.ppo_micro_batch_size_per_gpu (更新模型, 对mini-batch做划分)
ActorRolloutRefWorker.compute_log_prob,compute_log_prob 笔记
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor"))
@GPUMemoryLogger(role="compute_log_prob", logger=logger)
@DistProfiler.annotate(color="blue")
def compute_log_prob(self, data: DataProto):
data.meta_info["micro_batch_size"] = self.config.rollout.log_prob_micro_batch_size_per_gpu
# ...
# 核心部分,调用megatron_actor计算
output, entropys = self.actor.compute_log_prob(data=data, calculate_entropy=True)
# 返回格式
output = DataProto.from_dict(
tensors={"old_log_probs": output, "entropys": entropys},
meta_info={"temperature": self.config.rollout.temperature},
)
# ...
return outputMegatronPPOActor.compute_log_prob,PPOActor compute_log_prob 笔记
MegatronPPOActor.forward_backward_batch,forward_backward_batch 笔记
def forward_backward_batch(
self,
data: DataProto,
forward_only=False,
use_dynamic_bsz=False,
micro_batch_size=None,
max_token_len=None,
mini_batch_size=None,
):
# broadcast from last pp rank to all other pp ranks
# ...
mini_batch = data
mini_batch.batch = mini_batch.batch.contiguous()
# ...
if use_dynamic_bsz:
# pass
else:
micro_batches = mini_batch.batch.split(micro_batch_size)
seq_len = micro_batches[0]["input_ids"].shape[1]
total_seqlen = micro_batch_size * seq_len
n_micro_batch = len(micro_batches)
forward_backward_func = get_forward_backward_func()
# batch should be a list of batches inside micro-batches
batch_generator = make_batch_generator(micro_batches, vpp_size=len(self.critic_module))
# TODO: we may use the new schedule instead
# for flash-attn: (seq_len, batch_size, hidden_size) = (mbs*seq_len, 1, hidden_size)
if mpu.get_pipeline_model_parallel_world_size() > 1:
losses_reduced = forward_backward_func(
forward_step_func=forward_step,
data_iterator=batch_generator,
model=self.critic_module,
num_microbatches=n_micro_batch,
seq_length=total_seqlen, # no use when input_shapes was set
micro_batch_size=1, # no use when input_shapes was set
forward_only=forward_only,
)
else:
losses_reduced = forward_backward_func(
forward_step_func=forward_step,
data_iterator=batch_generator,
model=self.critic_module,
num_microbatches=n_micro_batch,
seq_length=total_seqlen, # in use for pp = 1 #
micro_batch_size=1, # in use for pp = 1 #
forward_only=forward_only,
)
# loss_reduces contains the stats returned from loss_func
losses_reduced = {"output": losses_reduced}
if use_dynamic_bsz:
losses_reduced["indices"] = indices
return losses_reducedref.log_prob_micro_batch_size_per_gpu
含义
- rollout期间,计算ref_log_prob时
- 把
real_train_batch划分为多个micro-batches micro_batch_size=log_prob_micro_batch_size_per_gpu
- 把
real_train_batch_size->log_prob_micro_batch_size_per_gpu
以下3者类似
- rollout.log_prob_micro_batch_size_per_gpu (计算old_log_prob)
- ref.log_prob_micro_batch_size_per_gpu (计算ref_log_prob)
- actor.ppo_micro_batch_size_per_gpu (更新模型, 对mini-batch做划分)
关键参数约束
参数计算
n_gpus= n_gpus_per_node * nnodesmodel_parallel_size= tensor_model_parallel_size * pipeline_model_parallel_sizemegatron_dp= n_gpus // (model_parallel_size* context_parallel_size )minimal_bsz= megatron_dp * actor.ppo_micro_batch_size_per_gpureal_train_batch_size= data.train_batch_size * rollout.n
参数约束条件
实际train_bs被mini_bs整除
real_train_batch_size%minimal_bsz== 0
gpu数量被parrallel_size整除
n_gpus%model_parallel_size*context_parallel_size== 0 ,非常重要!!
TP参数
num_attention_heads%tensor_model_parallel_size== 0
PP参数
num_layers%pipeline_model_parallel_size== 0
EP参数
num_moe_experts%moe_router_num_groups== 0
CP参数
n_gpus%model_parallel_size*context_parallel_size== 0 ,非常重要!!
max_token_len_per_gpu
含义
- 顾名思义,每个gpu生成的最大token长度。
作用
- 在forward_backward_batch里,计算total_seqlen
- total_seqlen = micro_batch_size * seq_len
- 但
实际这个不会生效,因为定好了shape做好了pad,forward_backward_batch 笔记
actor.ppo_max_token_len_per_gpu
# Max tokens per GPU in one PPO batch; affects gradient accumulation
# Typically it should be: n * ${data.max_prompt_length} + ${data.max_response_length}
# oc.select: the default val for ref.log_prob_max_token_len_per_gpu
ppo_max_token_len_per_gpu: 16384ref.log_prob_max_token_len_per_gpu
# the max token length per GPU
# same as actor_rollout_ref.actor.ppo_max_token_len_per_gpu if it exists, otherwise 16384
log_prob_max_token_len_per_gpu: ${oc.select:actor_rollout_ref.actor.ppo_max_token_len_per_gpu,16384}rollout.log_prob_max_token_len_per_gpu
# max token length for log_prob computation
# same as actor_rollout_ref.actor.ppo_max_token_len_per_gpu if it exists, otherwise 16384
log_prob_max_token_len_per_gpu: ${oc.select:actor_rollout_ref.actor.ppo_max_token_len_per_gpu,16384}
常见参数配置
SGLang+Yarn 配置
环境变量
- 如果环境变量未生效,可尝试ray stop后,再试试。
export SGLANG_ALLOW_OVERWRITE_LONGER_CONTEXT_LEN=1
+ray_kwargs.ray_init.runtime_env.SGLANG_ALLOW_OVERWRITE_LONGER_CONTEXT_LEN=1 \模型配置
- 常规rope配置
+actor_rollout_ref.model.override_config.rope_scaling.type=yarn \
+actor_rollout_ref.model.override_config.rope_scaling.factor=1.5 \
+actor_rollout_ref.model.override_config.rope_scaling.original_max_position_embeddings=32768 \SGLang 配置
context_length
export SGLANG_OVERRIDE_ARGS='{"rope_scaling": {"type": "yarn", "factor": 1.5, "original_max_position_embeddings": 32768}}'
+actor_rollout_ref.rollout.engine_kwargs.sglang.context_length=49152
+actor_rollout_ref.rollout.engine_kwargs.sglang.json_model_override_args="'${SGLANG_OVERRIDE_ARGS}'"Megatron 重计算配置
具体见笔记:Transformer 显存关键参数-重计算
recompute_granularity=selective
recompute_modules='["core_attn", "mlp"]'
actor_rollout_ref.actor.megatron.override_transformer_config.recompute_granularity="${recompute_granularity}"
actor_rollout_ref.actor.megatron.override_transformer_config.recompute_modules="${recompute_modules}"模型合并
官方文档:verl Using Checkpoints to Support Fault Tolerance Training
- 切记不要像官网一样使用
--tie-word-embedding这个参数,这会导致不存储lm_head.weight,这样会导致转换成hfmodel部署时,预测会出错,乱码、不停止。 - 保存后的tokenizer_config.json,没有
chat_template这个参数,原基模是有的,可以复制粘贴过来。
合并脚本如下:
local_dir="xxx"
target_dir="xxx"
python -m verl.model_merger merge \
--backend megatron \
--local_dir ${local_dir} \
--target_dir ${target_dir} \模型预测
部署成本地服务,参考sglang官方文档。
DEFAULT_SAMPLING_PARAMS='{"temperature": 0, "top_p": 1}'
model_path="xxx/hfmodel"
model_name="hmodel2"
python -m sglang.launch_server \
--model-path ${model_path} \
--host "127.0.0.1" \
--port 6380 \
--tensor-parallel-size 8 \
--served-model-name ${model_name} \
--preferred-sampling-params "${DEFAULT_SAMPLING_PARAMS}" \