本文为《类ChatGPT逐行代码解读》系列的第二篇,上一篇是:如何从零起步实现Transformer、ChatGLM
本文两个模型的特点是加了RLHF
第六部分 LLaMA的RLHF版:ChatLLaMA(英文版)
由于LLaMA没有使用RLHF方法,初创公司 Nebuly AI开源了RLHF版的LLaMA,即ChatLLaMA
6.1 三套数据集:分别训练actor、reward、rlhf
其训练过程类似 ChatGPT,而通过本博客内的《ChatGPT技术原理解析》3.1节,可知训练三个模型(SFT、RM、RL/PPO)得先准备三套数据集
6.1.1 actor_training_data,即用于微调GPT3所用的数据
actor_training_data,即用于微调GPT3所用的数据,比如
[
{
"user_input": "here the input of the user",
"completion": "here the model completion"
}
]
actor_training_data如何而来呢,有4项途径
- 使用 100% 合成数据,可以通过运行以下命令综合生成数据集:
python artifacts/generate_actor_dataset.py,注:此命令需要订阅OpenAI,生成完整数据集的davinci-003成本约为 200 美元(当然 也有免费的途径) - 使用具有辅助交互的开源数据集之一,目前支持:
Anthropic HH RLHF:这个数据集由结构化的 {question/answer pairs} 组成,包括机器人选择和拒绝的答案;
Stanford Human Preferences Dataset (SHP):这个数据集是从选定的“提问”subreddits 中挑选出来的,并且包括基于最受支持的回答的范围广泛的 {question/answer pairs} 的问题
可以运行以下命令下载数据集:python artifacts/download_dataset.py <dataset_name> --path <path_to_folder_for_download> --number_of_samples <N>
其中:
<dataset_name>对于 StanfordNLP/SHP 数据集,可以是“SHP”或“ARLHF”,对于 Anthropic/hh-rlhf 数据集,可以分别是“SHP”或“ARLHF”;
<path_to_folder_for_download>是要创建数据集的文件夹路径;
<N>是组成 reward_dataset.json 的样本数 -
使用 100% 个性化数据集
用户提供自己的个性化完整数据集,数据集必须是具有以下格式的 JSON 文件:
[
{
"user_input": "here the input of the user",
"completion": "here the model completion"
}
]
其中列表包含多个dictionaries,每个dictionary 对应一个数据样本,建议使用超过 1000 个数据样本来进行对actor的训练 -
创建完整的数据集,增加一些自定义数据样本,数据集可以从用户提供的一些提示+响应示例中综合生成(少数 => 10)
6.1.2 reward_training_data,用于训练一个奖励模型的数据
reward_training_data,用于训练一个奖励模型的数据,包含三部分的数据:
i) prompts,
ii) completion
iii) score of the completion assigned accordingly to the user feedback (the Human Feedback in RLHF,即对各个回答的评分score)
示例如下
[{
"user_input": "...",
"completion": "...",
"score": 1
},
...
]
同样的,奖励数据怎么来呢?有以下三种方式
- be synthetically scored using a LLM as Human Feedback
LLM 模型用于为每个entry计算分数
为此,LLM 需要一个提示模板,其中包含评估生成的文本的所有说明(比如奖励规则,什么情况下该奖 什么情况下不奖都得十分明确)。为此,您应该将key reward添加到文件中templates.json,比如:{
"reward": "Here is the template for the reward model. The rules are:\n\n1.Rule 1\n\n2. Rule 2"
}
如果未提供模板,则使用默认模板artifacts/generate_rewards.py,注:所有模板都必须保存在一个名为 .json 的 JSON 文件中templates.json获得unlabelled dataset后,您可以通过运行以下命令生成分数:
python artifacts/generate_rewards.py <dataset_path> --model <model_to_use> --temperature <t> --max_tokens <n> --reward_template <path_to_file.json>
其中,<dataset_path>要评分的reward dataset的路径;
<model_to_use>用于奖励的模型,默认建议使用text-davinci-003
<temperature>用于对模型进行评分的temperature,temperature =0.1;
<max_tokens>
<reward_template>,这是包含用于生成奖励的模板的文件的路径,如果未提供路径,将使用默认模版
这里值得注意的是,与instructGPT中的「人类通过对模型的输出进行排序,然后利用这些排序数据去训练一个RM」不同,ChatLLaMA直接训练一个RM对模型的输出进行打分 比如0-5分,且与人类的打分做MSE损失(减少RM打分与人类打分之间的差距)REWARD_TEMPLATE = dict( template=( "You have to evaluate the following chat with a score" "between 0 and 5"
最后,可能你会问,从哪里看出来的用的MSE损失,答案是从另外一个文件里看出来的(具体是chatllama/rlhf/reward.py 文件的第282行)
class RewardTrainer: """Class to train the reward model def __init__(self, config: ConfigReward) -> None: # save the config self.config = config # load the model self.reward = RewardModel(config) # optimizer self.optimizer = torch.optim.AdamW( self.reward.parameters(), lr=config.lr ) # loss function self.loss_function = torch.nn.MSELoss() // ...
-
用户提供他们个性化的完整数据集(至少需要 100 个数据样本),但数据集必须是以下格式的 JSON 文件,取名为:reward_training_data.json
[ { "user_input": "here type the user input", "completion": "here type the completion", "score": 4.0 }, { "user_input": "here type the user input", "completion": "random garbage", "score": 0.0 } ]
-
用户提供的少量示例和使用 LLM 综合扩展的数据集(通过self-instruct的方式提示LLM产生更多所需要的指令数据)
6.1.3 rlhf_training_data,通过RL方法不断优化迭代最优策略的数据
It can be provided in 2 different ways:
- Few examples provided by the user and dataset synthetically expanded using LLM(依然可以
继续通过self-instruct的方式提示LLM产生更多所需要的指令数据)
需要将key rlhf添加到templates.json文件中,其中包含有关要执行的任务的信息以及 LLM 生成所需的额外上下文,这是模板的示例(所有模板必须保存在一个名为templates.json):{
"rlhf": "Here is the template for the generating RLHF prompts. The task we want to perform is ..."
} -
The user provides the full dataset with possible interactions with the model
数据集需要包含超过 1000 个提示示例(文件命名为rlhf_training_data.json):
[
{
"user_input": "here the example of user input"
}
]
6.2 训练流程:SFT、RM、RL/PPO训练三步骤
6.2.1 RewardTrainer 类的 train 方法训练一个奖励函数
chatllama/rlhf/reward.py中
首先定义了一个名为 Reward Model 的类,作为奖励模型或批评者模型(Critic Model)。Reward Model 是一个基于语言模型的模型,附加了一个头部head,用于预测给定的 token 序列的奖励(一个标量值),最后将CriticModel类设置为RewardModel类,以保持命名一致性
之后,定义类:RewardDatase用于训练奖励模型的数据集
RewardDataset 类是一个继承自 Dataset 的自定义数据集类,它的作用是从给定的 JSON 文件中读取数据,并将数据整理成适当的格式。JSON 文件应包含以下格式的数据:
class RewardDataset(Dataset):
"""Dataset class for the reward model
read a json file with the following format:
[
{
"user_input": "...",
"completion": "...",
"score": ...
},
...
]
Where:
user_input: the initial input of the user
completion: the completion generated by the model
score: the score given by the user to the completion (or by the LLM)
"""
其中 user_input 是用户的初始输入,completion 是模型生成的补全,而 score 是用户或LLM给予补全的分数
再定义一个RewardTrainer 类用于训练奖励模型,它初始化奖励模型、优化器、损失函数(具体如上文所说,或如282行所述的MSE损失函数)、数据集和数据加载器等。此外,它还支持使用 DeepSpeed 或 Accelerate(两种高性能深度学习训练框架)进行训练
RewardTrainer 类的主要方法有:
train:训练奖励模型。它执行训练循环,包括前向传播、计算损失、反向传播和优化器更新。在每个周期结束时,它还可以对模型进行验证(如果提供了验证数据集的话)
- 首先是初始化
# 定义构造函数 def __init__(self, config: ConfigReward) -> None: # 保存配置对象 self.config = config # 加载模型 self.reward = RewardModel(config) # 创建优化器 self.optimizer = torch.optim.AdamW( self.reward.parameters(), lr=config.lr ) # 定义损失函数,用的交叉熵损失 self.loss_function = torch.nn.MSELoss() # 检查验证数据集是否存在 self.validation_flag = False if config.validation_dataset_path is not None: self.validation_flag = True # 创建数据集和数据加载器 self.train_dataset = RewardDataset(config.train_dataset_path) self.train_dataloader = DataLoader( self.train_dataset, batch_size=config.batch_size ) # 如果有验证数据集,则创建验证数据集和数据加载器 if self.validation_flag: self.eval_dataset = RewardDataset(config.validation_dataset_path) self.validation_dataloader = DataLoader( self.eval_dataset, batch_size=config.batch_size ) # 初始化学习率调度器 - 学习率将下降到初始值的10% self.scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts( self.optimizer, T_0=len(self.train_dataset) // config.batch_size, T_mult=1, eta_min=config.lr * 0.1, last_epoch=-1, )
- save_checkpoint:保存模型的检查点。在训练过程中,可以定期将模型的当前状态(包括模型参数、优化器状态和训练统计信息)保存为检查点,以便在需要时恢复训练
- load_checkpoint:从检查点恢复模型。如果在训练过程中找到检查点文件,则该方法将从检查点恢复模型状态,并返回从何处恢复训练的周期和步骤
- 接下来,是具体的训练过程
def train( self, ) -> None: # 训练奖励模型 # # 打印开始训练奖励模型的消息 print("Start Training the Reward Model") # 如果启用了 DeepSpeed,从训练数据加载器获取批次大小 if self.config.deepspeed_enable: batch_size = self.train_dataloader.batch_size else: # 否则 batch_size = self.config.batch_size # 从配置获取批次大小 epochs = self.config.epochs # 从配置获取训练轮次 device = self.config.device # 从配置获取设备 # 从配置获取每次打印的迭代次数 iteration_per_print = self.config.iteration_per_print # 从配置获取检查点步骤 checkpoint_steps = self.config.checkpoint_steps # 计算训练数据集的迭代次数 n_iter = int(len(self.train_dataset) / batch_size) # 加载检查点并获取开始轮次和开始步骤 start_epoch, start_step = self.load_checkpoint() # 初始化检查点计数器 cnt_checkpoints = 1 # 训练循环 for epoch in range(start_epoch, epochs): # 对于每个轮次 self.reward.train() # 将奖励模型设置为训练模式 # 遍历训练数据加载器中的每个输入 for i,inputs in enumerate(self.train_dataloader): # 如果从检查点恢复,则跳过步骤 if i < start_step: continue # 获取输入 input_text = inputs[0] # 获取输入文本 score = inputs[1] # 获取分数 # 对输入进行分词 with torch.no_grad(): # 禁用梯度计算 input_tokens = self.reward.tokenizer( input_text, return_tensors="pt", truncation=True, padding=True, ) # 对输入文本进行分词 output = torch.as_tensor( score, dtype=torch.float32, device=device ) # 将分数转换为张量 # 前向传播 if self.config.deepspeed_enable: # 如果启用了 DeepSpeed est_output = self.model_engine( input_tokens["input_ids"].to(device), input_tokens["attention_mask"].to(device), )[:, -1] # 使用模型引擎进行前向传播 else: # 否则 est_output = self.reward.get_reward( input_tokens["input_ids"].to(device), input_tokens["attention_mask"].to(device), ) # 使用奖励模型进行前向传播 # 计算损失函数 loss = self.loss_function(est_output, output) # 将损失添加到训练统计数据中 self.training_stats.training_loss.append(loss.item()) # 反向传播 if self.config.deepspeed_enable: # 如果启用了 DeepSpeed self.model_engine.backward(loss) # 使用模型引擎进行反向传播 self.model_engine.step() # 更新模型参数 elif self.config.accelerate_enable: # 如果启用了加速 self.optimizer.zero_grad() # 将优化器的梯度归零 self.accelerator.backward(loss) # 使用加速器进行反向传播 self.optimizer.step() # 更新模型参数 self.scheduler.step() # 更新学习率调度器 else: # 否则 self.optimizer.zero_grad() # 将优化器的梯度归零 loss.backward() # 进行反向传播 self.optimizer.step() # 更新模型参数 self.scheduler.step() # 更新学习率调度器 # 打印进度,如果当前迭代次数是打印间隔的整数倍 if i % iteration_per_print == 0: print( f"Epoch: {epoch+1}/{epochs}, " f"Iteration: {i+1}/{n_iter}, " f"Training Loss: {loss.item()}" ) # 打印训练进度 printed_est_output = [ round(float(x), 1) for x in est_output.cpu().tolist() ] # 对估计输出进行四舍五入 print( "prediction", printed_est_output, "target", score.cpu().tolist(), ) # 打印预测值和目标值 # 保存检查点,如果检查点计数器是检查点步骤的整数倍 if cnt_checkpoints % checkpoint_steps == 0: self.save_checkpoint(epoch, i, epochs, n_iter) # 保存检查点 cnt_checkpoints = 1 # 重置检查点计数器 else: # 否则 cnt_checkpoints += 1 # 检查点计数器加一 # 验证 if self.validation_flag: # 如果启用了验证 self.reward.eval() # 将奖励模型设置为评估模式 with torch.no_grad(): # 禁用梯度计算 for i, (text, score) in enumerate( self.validation_dataloader ): # 遍历验证数据加载器中的每个输入 # 对输入进行分词 input_tokens = self.reward.tokenizer( text, return_tensors="pt", padding=True ) # 对输入文本进行分词 input_tokens = input_tokens.to(device) # 将输入令牌移动到设备上 # TODO: 检查输入令牌的长度,如果过长可能会导致问题 output = torch.tensor(score, dtype=torch.float32).to( device ) # 将分数转换为张量并移动到设备上 # 前向传播 est_output = self.reward.get_reward( input_tokens["input_ids"], input_tokens["attention_mask"], ) # 使用奖励模型进行前向传播 # 计算损失函数 loss = self.loss_function(est_output, output) # 将损失添加到训练统计数据中 self.training_stats.validation_loss.append(loss.item()) # 打印进度,如果当前迭代次数是打印间隔的整数倍 if i % iteration_per_print == 0: print( f"Epoch: {epoch+1}/{epochs}, " f"Iteration: {i+1}/{n_iter}, " f"Validation Loss: {loss.item()}" ) # 打印验证进度 # 在恢复训练后重置 start_step start_step = 0 # 训练结束后保存模型 self.reward.save() # 保存奖励模型
总之,在 RewardTrainer 类的 train 方法中
首先会尝试从检查点恢复模型(如果有的话);
然后,它会遍历数据加载器中的所有输入,对每个输入执行前向传播、计算损失、反向传播和优化器更新。在每个周期结束时,如果提供了验证数据集,还会对模型进行验证;
最后,在训练完成后,将保存模型
6.2.2 通过chatllama/rlhf/actor.py再训练一个actor
此外,项目通过chatllama/rlhf/actor.py再训练一个actor,比如通过train方法训练一个基于transformer的模型,它包括了数据处理、模型训练、验证和模型保存等操作
- 定义
train
方法,它没有返回值。 - 打印训练开始信息。
- 获取配置参数,包括批量大小、训练轮数、设备和检查点步数。
- 计算迭代次数。
- 加载模型检查点并获取开始的轮数和步数。
- 如果从头开始训练,清空训练统计。
- 初始化检查点计数器。
- 定义训练循环,其中包括:
- 设置模型为训练模式。
- 遍历训练数据加载器。
- 如果恢复训练,跳过已经完成的步数。
- 对输入文本进行标记化处理。
- 将输入文本分割成tokens和mask。
- 添加结束符(EOS)。
- 将输入文本分割成输入和输出。
- 将输入文本移至设备。
- 执行前向传播。
- 计算损失。
- 执行反向传播和优化。
- 打印训练进度。
- 定期保存检查点和训练统计。
- 进行验证(如果启用验证的话):
- 设置模型为评估模式。
- 使用
torch.no_grad()
禁用梯度计算。 - 遍历验证数据加载器。
- 对输入文本进行标记化处理。
- 将输入文本分割成验证输入和输出。
- 执行前向传播。
- 计算损失。
- 更新验证损失统计。
- 打印验证进度。
- 在恢复训练后,将
start_step
重置为0。 - 训练完成后,保存模型。
- 打印训练结束信息
6.2.3 通过PPO算法优化强化学习任务中的策略(actor)和价值(critic)网络
有了奖励函数和actor,便可以通过PPO算法优化强化学习任务中的策略(actor)和价值(critic)网络,具体如下图,设置内外两个循环
- 外层循环迭代训练轮次(epochs)
- 内层循环遍历数据加载器(dataloader)中的批次(batches),在每次迭代中,它会处理一批数据,包括状态、动作、价值等,这些数据用于训练智能体-评论家模型
在内层循环中依次做如下处理(以下代码来源于:chatllama/chatllama/rlhf/trainer.py ):
首先是导入必须的库和模块,当然,主要是ActorCritic类
- 导入所需的库和模块。
change_tokenization
函数:用于在两个不同的分词器之间转换给定的tokens。check_model_family
函数:检查两个配置是否属于相同的模型家族。ActorCritic
类:包含了actor和critic模型,并用于在训练actor过程中为给定的序列生成动作和值。它包括以下方法:__init__
:初始化actor和critic模型def __init__(self, config: Config) -> None: super().__init__() self.config = config self.actor = ActorModel(config.actor) # check if critic must be initialized from reward model ModelLoader.init_critic_from_reward(config.critic) self.critic = CriticModel(config.critic) # if the actor and critic use the same tokenizer is set to True self.use_same_tokenizer = False # debug flag self.debug = config.actor.
load
:加载模型,但未实现。save
:将模型保存到路径。forward
:基于给定的整个序列,使用actor的forward方法获取序列中每个token的logits,并使用critic的forward方法获取每个生成步骤的值。
这个代码主要用于强化学习训练自然语言生成模型。ActorCritic
类是其中的核心部分,它包含了actor和critic模型。这两个模型在训练过程中相互协作,用于生成动作和值。
其次,主要是关于一个用于生成动作、动作逻辑、价值和序列的生成函数,以及用于存储训练数据和生成训练示例的类
- 首先定义了一个名为generate的函数,它使用了@torch.no_grad()和@beartype修饰器。这个函数接收四个参数,分别是states_actor、states_mask_actor和states_critic,并返回一个元组
这个函数的主要目的是从输入状态生成动作、动作逻辑、价值和序列。它首先从actor模型生成动作序列,然后创建一个用于actor序列的mask。接下来,它检查是否需要为critic使用不同的编码。如果需要,它将使用change_tokenization函数来更改序列的编码。接着,它生成动作逻辑和价值。如果处于调试模式,将打印一些调试信息。 - 接下来,代码定义了一个名为Memory的namedtuple,用于存储每个经验的数据。Memory包含了11个字段,如states_actor、actions、values等。
- 然后定义了一个名为ExperienceDataset的类,它继承自torch.utils.data.Dataset。这个类用于训练actor-critic模型。它接收一个memories参数和一个device参数。memories参数是一个包含Memory实例的双端队列。device参数表示要在哪个设备上进行计算。这个类实现了__len__和__getitem__方法,使其可以像普通的PyTorch数据集一样使用。
- 最后,定义了一个名为ExamplesSampler的类,用于从JSON文件中读取示例并在需要时抽样。这个类接收一个表示文件路径的path参数。在初始化时,它从文件中读取数据,并将其存储在self.data中。它还实现了一个名为sample的方法,用于从数据中抽取指定数量的示例。
再之后,定义了一个名为 RLTrainer
的类,用于使用强化学习训练一个Actor-Critic模型。该类具有多个属性和方法,用于训练过程中的各种操作。
- 在
__init__
方法中,初始化了训练器的各个组件,包括Actor-Critic模型、actor和critic优化器、reward模型、用于存储训练统计数据和对话记录的类、以及示例采样器 save_checkpoint
方法保存了当前状态的Actor-Critic模型的检查点,包括当前的训练轮数、actor和critic模型的状态字典,以及它们各自的优化器的状态字典。load_checkpoint
方法加载了Actor-Critic模型的检查点,包括训练轮数、actor和critic模型的状态字典,以及它们各自的优化器的状态字典。如果没有找到检查点,则返回轮数0。如果actor和critic的检查点存在差异,则从两者中最小的轮数开始训练。
再之后,调用 learn
方法更新actor和critic模型,并保存训练统计数据和对话记录
- 使用智能体-评论家模型计算新的动作概率和价值
# get actor critic new probabilities and values actions_logits, values = self.actorcritic.forward( sequences_actor, sequences_mask_actor, sequences_critic, sequences_mask_critic, action_len_actor.item(), action_len_critic.item(), )
- 计算动作的对数概率、熵和KL散度损失
# get action log prob actions_prob = ( torch.softmax(actions_logits, dim=-1).max(dim=-1).values ) actions_log_prob = torch.log(actions_prob + self.eps) # compute entropy,一般表示为-sum「p(x)logp(x)」 entropies = (actions_prob * actions_log_prob).sum(dim=-1) # compute KL divergence,一般表示为:-sum「p(x)log q(x)/p(x)」 kl_div_loss = ( (actions_prob * (old_actions_log_probs - actions_log_prob)) .sum(dim=-1) .mean() )
- 计算重要性权重比率(ratios),即新旧策略的概率比
# compute ratios ratios = (actions_log_prob - old_actions_log_probs).exp()
- 计算PPO损失,包括优势函数的计算和PPO-clip算法的应用
首先我们回顾下强化学习极简入门一文里对『近端策略优化裁剪PPO-clip』的阐述 简单实现的话,即是
更具体的实现,则可以如下所示# ratios即为重要性权重,exp代表求期望,括号里的environment_log_probs代表用于与环境交互的策略 ratios = torch.exp(log_probs - environment_log_probs) # 分别用sur_1、sur_2来计算公式的两部分 # 第一部分是重要性权重乘以优势函数 sur_1 = ratios * advs # 第二部分是具体的裁剪过程 sur_2 = torch.clamp(ratios, 1 - clip_eps, 1 + clip_eps) * advs # 最终看谁更小则取谁 clip_loss = -torch.min(sur_1,sur_2).mean()
# 计算PPO总损失 if check_model_family(self.config.actor, self.config.critic): # 计算 TRL 中的折扣回报 gamma = self.config.trainer.gamma_discounted # 初始化折扣回报矩阵 discounted_rewards = torch.zeros_like(old_values) # 遍历每个时间步 for i in range(discounted_rewards.shape[1]): for j in range(i, discounted_rewards.shape[1]): # 计算折扣回报 discounted_rewards[:, i] += ( gamma ** (j - i) * rewards[:, j] ) # 计算优势值,与TRL 中旧值的符号相反 advantages = ( discounted_rewards - old_values ) # normalize advantages advantages = (advantages - advantages.mean(dim=-1)) / ( advantages.std() + self.eps ) surr1 = advantages * ratios else: advantages = rewards - old_values[:, -1] surr1 = advantages * ratios surr2 = ( torch.clamp(ratios, 1 - actor_eps_clip, 1 + actor_eps_clip) * advantages )
- 计算策略损失和总损失
可能有读者看到这里 看迷糊了,即咋出来两个损失函数了,看起来是一个策略损失,一个KL散度损失,其实你仔细一看上面的代码,你会发现本项目中的PPO 算法引入的这两个损失函数:policy_loss = -torch.min(surr1, surr2) - beta_s * entropies policy_loss = policy_loss.mean() loss = policy_loss + kl_div_loss
一个 policy loss,本质是一个目标函数(具体用的近端策略优化裁剪PPO-clip与熵的差值) 其中,表示在当前策略下采样得到的经验的无偏估计, 是策略比率,是优势函数, 是超参数,用于控制策略更新的幅度,是熵的系数
另一个 KL散度损失(kl_div_loss),用于限制新旧 policy 之间的差异,以免更新太快,导致学习不稳定最终,总的损失函数为:
其中, 是超参数,用于控制 KL 散度损失的权重
- 如果损失值为NaN,抛出异常
# check if loss item is NaN if torch.isnan(loss): raise ValueError("Loss is nan")
- 更新策略,包括使用DeepSpeed或Accelerate库进行优化
# 按照损失更新 actor 模型参数 # 使用 DeepSpeed 的 engine 对 loss 进行反向传播 if self.config.actor.deepspeed_enable: actor_model_engine.backward(loss) actor_model_engine.step() # 如果启用了 PyTorch 的 Accelerate elif self.config.actor.accelerate_enable: # 将 actor 模型参数的梯度清零 self.actor_optimizer.zero_grad() # 使用 Accelerate 对 loss 进行反向传播 actor_accelerator.backward(loss) # 使用 PyTorch 的优化器更新 actor 模型参数 self.actor_optimizer.step() # 使用 PyTorch 的学习率调度器更新学习率 self.actor_scheduler.step() else: self.actor_optimizer.zero_grad() # 对 loss 进行反向传播 loss.backward() self.actor_optimizer.step() self.actor_scheduler.step()
- 计算价值损失
# compute value loss # 损失是奖励与值之间的距离 # 我希望这个距离尽可能小,以便值能够代表奖励,因此我在两者之间取最大值 # 裁剪限制了值损失剪辑的变化速率 value_loss_clipped = old_values + (values - old_values).clamp( -critic_eps_clip, critic_eps_clip ) # 计算第一种值损失,即裁剪后的值与奖励之间的平方差 value_loss1 = (value_loss_clipped - rewards) ** 2 # 计算第二种值损失,即未裁剪的值与奖励之间的平方差 value_loss2 = (values - rewards) ** 2 # 选择两种值损失中较大的那个,并计算其均值 value_loss = torch.max(value_loss1, value_loss2).mean()
- 如果价值损失为NaN,抛出异常
if torch.isnan(value_loss): raise ValueError("Value loss is nan")
- 更新评论家,包括使用DeepSpeed或Accelerate库进行优化
# upate critic if self.config.critic.deepspeed_enable: critic_model_engine.backward(value_loss) critic_model_engine.step() elif self.config.critic.accelerate_enable: self.critic_optimizer.zero_grad() critic_accelerator.backward(loss) self.critic_optimizer.step() self.critic_scheduler.step() else: self.critic_optimizer.zero_grad() value_loss.backward() self.critic_optimizer.step() self.critic_scheduler.step()
- 将损失值追加到训练统计信息中
# 将训练损失值添加到训练统计信息中 self.training_stats.training_loss.append( # 将损失值从计算图中分离,移动到 CPU 上,并转换为 Python 数值类型 loss.detach().cpu().item() ) # 将价值损失值添加到训练统计信息中 self.training_stats.value_loss.append( # 将价值损失值从计算图中分离,移动到 CPU 上,并转换为 Python 数值类型 value_loss.detach().cpu().item() )
- 输出迭代信息
# print iteration info print( f"Epoch {epoch+1}/{epochs}", f"Step {k+1}/{int(len(dataloader) / batch_size)}", f"Loss {loss.detach().cpu().item():.4f}", f"Value Loss {value_loss.detach().cpu().item():.4f}", )
- 训练循环结束后,将智能体-评论家模型设为评估模式并输出训练结束信息
self.actorcritic.eval() print("End Learning")
最后的最后,定义了一个 train() 方法,使用 actor-critic 算法训练强化学习模型。方法首先初始化各种设置,如训练的总 episode 数量、每个 episode 的最大步数、批次大小和训练设备等。然后检查要用于学习的记忆数量是否是批次大小的倍数,以及总步数是否是更新步数的倍数。
该方法初始化记忆,加载检查点(如果有的话),如果是从头开始的新训练,则清除会话记录。然后循环遍历 episode 和 timestep,从示例数据集中抽取样本,为 actor 和 critic 进行分词,生成动作和值的序列,计算动作日志概率,计算奖励。存储每个 episode/timestep 的记忆,并将完成(解码后的动作)记录在会话日志中。
在一定数量的 timestep 后,使用记忆进行学习,并计算平均奖励。该过程重复进行,直到训练完成。该方法在训练结束时保存模型和会话日志。
第七部分 ColossalChat:通过self-instruct技术指令微调LLaMA且加上RLHF
7.1 技术架构:通过self-instruct生成的中英双语数据集 + 三阶段训练方式
据介绍(介绍页面,该页面的翻译之一,代码地址),Colossal-AI 开源了基于 LLaMA-7B 模型的包含完整 RLHF 流程的类 Chat 模型复现方案 ColossalChat
7.1.1 针对社交平台的种子数据且利用self-instruct 技术生成中英双语数据集
ColossalChat 收集并清洗了社交平台上人们的真实提问场景作为种子数据集,然后利用 self-instruct 技术扩充数据(通过prompt OpenAI API,花费约 900 美元进行标注),最终生成了10.4万条问答的中、英双语数据集(这是数据的开源地址)
他们的说法是,对比其他 self-instruct 方法生成的数据集,该数据集的种子数据更加真实、丰富,生成的数据集涵盖的话题更多,该数据可以同时用于微调和 RLHF 训练,通过高质量的数据,ColossalChat 能进行更好地对话交互,同时支持中文
7.1.2 ColossalChat训练方式:类似instructGPT/ChatGPT的训练三步骤
关于训练方式:类似instructGPT/ChatGPT的训练三步骤(如果忘了,务必复习下此文的3.1节)
- Stage1 是supervised-fintuning,即使用上文提到的数据集进行监督微调
- Stage2 训练一个奖励模型(初始化为阶段1的SFT模型),它通过模型对于同一个 prompt 的不同输出进行人工排序,根据排序结果监督训练出一个奖励模型
- Stage3 是通过阶段2训练出来的奖励函数微调出一个RL模型,微调过程中通过PPO算法限制RL模型的参数更新范围(以阶段1的SFT模型的策略为参考基准,PPO算法避免与基线模型SFT的策略偏离过远)
具体而言,为两个阶段进行:
- 如上图底部,首先是 Make Experience 部分,利用 SFT 、Actor、RM、Critic模型计算生成 Experience 存入 buffer 中; 之后是参数更新部分,利用 Experience 计算价值损失(value loss)和策略损失(policy loss),具体说明在此文的4.4.3节有介绍
- 如上图顶部即是PTX 部分(上面的目标函数中加在最后的偏置项)
ColossalChat 计算 Actor 的现有输出response 和预训练语料的回答部分的交叉熵损失函数(calculates the cross-entropy loss between the Actor’s output response and the response part of the input corpus)
用来在 PPO 梯度中加入预训练梯度(add pre-training gradients to the PPO gradient)
以保持语言模型比如GPT2原有的核心性能(maintain the language model’s original performance and prevent forgetting),防止忘了最早从哪里出发的(GPT2 SFT RM RLHF) - 最后将策略损失、价值损失和 PTX 损失加和(the policy loss, value loss, and PTX loss are summed up),进行反向传播和参数更新
7.2 代码实现:SFT模型 + 奖励模型 + PPO training
先看下整体的代码架构图
接下来,我们看下一些关键实现
7.2.1 首先,训练一个SFT模型
首先通过ColossalAI/applications/Chat/coati/trainer/sft.py,训练一个SFT模型
import math
import time
from abc import ABC
from typing import Optional
import loralib as lora
import torch
import torch.distributed as dist
import wandb
from coati.models.loss import GPTLMLoss
from torch import nn
from torch.optim import Adam, Optimizer
from torch.optim.lr_scheduler import LambdaLR
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from transformers.trainer import get_scheduler
from colossalai.logging import get_dist_logger
from .strategies import Strategy
from .utils import is_rank_0
class SFTTrainer(ABC):
"""
Trainer to use while training reward model.
Args:
model (torch.nn.Module): the model to train
strategy (Strategy): the strategy to use for training
optim(Optimizer): the optimizer to use for training
train_dataloader: the dataloader to use for training
eval_dataloader: the dataloader to use for evaluation
batch_size (int, defaults to 1): the batch size while training
max_epochs (int, defaults to 2): the number of epochs to train
optim_kwargs (dict, defaults to {'lr':1e-4}): the kwargs to use while initializing optimizer
"""
def __init__(
self,
model,
strategy: Strategy,
optim: Optimizer,
train_dataloader: DataLoader,
eval_dataloader: DataLoader = None,
batch_size: int = 1,
max_epochs: int = 2,
accimulation_steps: int = 8,
) -> None:
super().__init__()
self.strategy = strategy
self.epochs = max_epochs
self.train_dataloader = train_dataloader
self.eval_dataloader = eval_dataloader
self.model = strategy.setup_model(model)
if "DDP" in str(self.strategy):
self.model = self.model.module
self.optimizer = strategy.setup_optimizer(optim, self.model)
self.accimulation_steps = accimulation_steps
num_update_steps_per_epoch = len(train_dataloader) // self.accimulation_steps
max_steps = math.ceil(self.epochs * num_update_steps_per_epoch)
self.scheduler = get_scheduler("cosine",
self.optimizer,
num_warmup_steps=math.ceil(max_steps * 0.03),
num_training_steps=max_steps)
def fit(self, logger, log_interval=10):
wandb.init(project="Coati", name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
wandb.watch(self.model)
total_loss = 0
# epoch_bar = tqdm(range(self.epochs), desc='Epochs', disable=not is_rank_0())
step_bar = tqdm(range(len(self.train_dataloader) // self.accimulation_steps * self.epochs),
desc=f'steps',
disable=not is_rank_0())
for epoch in range(self.epochs):
# process_bar = tqdm(range(len(self.train_dataloader)), desc=f'Train process for{epoch}', disable=not is_rank_0())
# train
self.model.train()
for batch_id, batch in enumerate(self.train_dataloader):
prompt_ids = batch["input_ids"].to(torch.cuda.current_device())
p_mask = batch["attention_mask"].to(torch.cuda.current_device())
labels = batch["labels"].to(torch.cuda.current_device())
# prompt_ids = prompt_ids.squeeze(1).cuda()
# p_mask = p_mask.squeeze(1).cuda()
# prompt_logits = self.model(prompt_ids, attention_mask=p_mask, labels=labels)
outputs = self.model(prompt_ids, attention_mask=p_mask, labels=labels)
loss = outputs.loss
prompt_logits = outputs.logits
if loss >= 2.5:
logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}")
loss = loss / self.accimulation_steps
self.strategy.backward(loss, self.model, self.optimizer)
total_loss += loss.item()
# gradient accumulation
if (batch_id + 1) % self.accimulation_steps == 0:
self.strategy.optimizer_step(self.optimizer)
self.optimizer.zero_grad()
self.scheduler.step()
wandb.log({
"loss": total_loss / self.accimulation_steps,
"lr": self.scheduler.get_last_lr()[0],
"epoch": epoch,
"batch_id": batch_id
})
total_loss = 0
step_bar.update()
# if batch_id % log_interval == 0:
# logger.info(f'Train Epoch {epoch}/{self.epochs} Batch {batch_id} Rank {dist.get_rank()} loss {loss.item()}')
# wandb.log({"loss": loss.item()})
# process_bar.update()
# eval
if self.eval_dataloader is not None:
self.model.eval()
with torch.no_grad():
loss_sum = 0
num_seen = 0
for batch in self.eval_dataloader:
prompt_ids = batch["input_ids"].to(torch.cuda.current_device())
p_mask = batch["attention_mask"].to(torch.cuda.current_device())
labels = batch["labels"].to(torch.cuda.current_device())
# prompt_ids = prompt_ids.squeeze(1).cuda()
# p_mask = p_mask.squeeze(1).cuda()
outputs = self.model(prompt_ids, attention_mask=p_mask, labels=labels)
loss = outputs.loss
# prompt_logits = outputs.logits
loss_sum += loss.item()
num_seen += prompt_ids.size(0)
loss_mean = loss_sum / num_seen
if dist.get_rank() == 0:
logger.info(f'Eval Epoch {epoch}/{self.epochs} loss {loss_mean}')
# epoch_bar.update()
def save_model(self,
path: str,
only_rank0: bool = False,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0, tokenizer=tokenizer)
7.2.2 训练一个奖励模型
其次,通过ColossalAI/applications/Chat/coati/trainer/rm.py 训练一个奖励模型
from abc import ABC
from datetime import datetime
from typing import Optional
import pandas as pd
import torch
import torch.distributed as dist
from torch.optim import Optimizer, lr_scheduler
from torch.utils.data import DataLoader, Dataset, DistributedSampler
from tqdm import tqdm
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from .strategies import Strategy
from .utils import is_rank_0
class RewardModelTrainer(ABC):
"""
Trainer to use while training reward model.
Args:
这个类继承了 ABC 抽象基类。它接受以下参数:
model:待训练的模型
strategy:训练策略
optim:优化器
loss_fn:损失函数
train_dataset:训练数据集
valid_dataset:验证数据集
eval_dataset:评估数据集
batch_size:批次大小(默认为1)
max_epochs:最大训练轮数(默认为2)
"""
# 初始化 RewardModelTrainer 对象,配置模型、优化器、调度器等,并创建训练、验证和评估的数据加载器
def __init__(
self,
model,
strategy: Strategy,
optim: Optimizer,
loss_fn,
train_dataset: Dataset,
valid_dataset: Dataset,
eval_dataset: Dataset,
batch_size: int = 1,
max_epochs: int = 1,
) -> None:
super().__init__()
self.strategy = strategy
self.epochs = max_epochs
train_sampler = None
if dist.is_initialized() and dist.get_world_size() > 1:
train_sampler = DistributedSampler(train_dataset, shuffle=True, seed=42, drop_last=True)
self.train_dataloader = DataLoader(train_dataset,
shuffle=(train_sampler is None),
sampler=train_sampler,
batch_size=batch_size)
self.valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)
self.eval_dataloader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=True)
self.model = strategy.setup_model(model)
self.loss_fn = loss_fn
self.optimizer = strategy.setup_optimizer(optim, self.model)
self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, self.train_dataloader.__len__() // 100)
# 计算给定数据加载器上的准确率,计算选定奖励与拒绝奖励之间的平均距离
def eval_acc(self, dataloader):
dist = 0
on = 0
cnt = 0
self.model.eval()
with torch.no_grad():
for chosen_ids, c_mask, reject_ids, r_mask in dataloader:
chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())
c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())
reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())
r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())
chosen_reward = self.model(chosen_ids, attention_mask=c_mask)
reject_reward = self.model(reject_ids, attention_mask=r_mask)
for i in range(len(chosen_reward)):
cnt += 1
if chosen_reward[i] > reject_reward[i]:
on += 1
dist += (chosen_reward - reject_reward).mean().item()
dist_mean = dist / len(dataloader)
acc = on / cnt
self.model.train()
return dist_mean, acc
# 用于实际训练模型。在每个训练轮中,它会遍历 train_dataloader,计算损失并更新模型
# 每 100 步,它会计算验证集上的距离和准确率,并将其记录到日志文件中
# 同时,在每个训练轮结束时,计算评估集上的距离和准确率
def fit(self):
time = datetime.now()
epoch_bar = tqdm(range(self.epochs), desc='Train epoch', disable=not is_rank_0())
for epoch in range(self.epochs):
step_bar = tqdm(range(self.train_dataloader.__len__()),
desc='Train step of epoch %d' % epoch,
disable=not is_rank_0())
# train
self.model.train()
cnt = 0
acc = 0
dist = 0
for chosen_ids, c_mask, reject_ids, r_mask in self.train_dataloader:
chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())
c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())
reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())
r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())
chosen_reward = self.model(chosen_ids, attention_mask=c_mask)
reject_reward = self.model(reject_ids, attention_mask=r_mask)
loss = self.loss_fn(chosen_reward, reject_reward)
self.strategy.backward(loss, self.model, self.optimizer)
self.strategy.optimizer_step(self.optimizer)
self.optimizer.zero_grad()
cnt += 1
if cnt == 100:
self.scheduler.step()
dist, acc = self.eval_acc(self.valid_dataloader)
cnt = 0
if is_rank_0():
log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]],
columns=['step', 'loss', 'dist', 'acc'])
log.to_csv('log_%s.csv' % time, mode='a', header=False, index=False)
step_bar.update()
step_bar.set_postfix({'dist': dist, 'acc': acc})
# eval
dist, acc = self.eval_acc(self.eval_dataloader)
if is_rank_0():
log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]], columns=['step', 'loss', 'dist', 'acc'])
log.to_csv('log.csv', mode='a', header=False, index=False)
epoch_bar.update()
step_bar.set_postfix({'dist': dist, 'acc': acc})
step_bar.close()
# 用于保存训练好的模型。可以选择仅在 rank 0 的进程上保存模型,并选择保存 tokenizer
def save_model(self,
path: str,
only_rank0: bool = False,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0, tokenizer=tokenizer)
7.2.3 通过trainer/ppo.py to start PPO training
最后,通过ColossalAI/applications/Chat/coati/trainer/ppo.py to start PPO training
from typing import Any, Callable, Dict, List, Optional
import torch
import torch.nn as nn
from coati.experience_maker import Experience, NaiveExperienceMaker
from coati.models.base import Actor, Critic
from coati.models.generation_utils import update_model_kwargs_fn
from coati.models.loss import PolicyLoss, ValueLoss
from coati.replay_buffer import NaiveReplayBuffer
from torch.optim import Optimizer
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from .base import Trainer
from .callbacks import Callback
from .strategies import Strategy
class PPOTrainer(Trainer):
"""
Trainer for PPO algorithm.
Args:
接受以下参数:
strategy:用于训练的策略
actor:PPO算法中的执行者(actor)模型
critic:PPO算法中的评论者(critic)模型
reward_model:用于计算句子奖励的奖励模型
initial_model:用于生成参考对数值的初始模型,以限制actor的更新
actor_optim:用于actor模型的优化器
critic_optim:用于critic模型的优化器
其他参数:控制训练过程的各种超参数,如kl系数、批次大小等
"""
def __init__(self,
strategy: Strategy,
actor: Actor,
critic: Critic,
reward_model: nn.Module,
initial_model: Actor,
actor_optim: Optimizer,
critic_optim: Optimizer,
kl_coef: float = 0.1,
ptx_coef: float = 0.9,
train_batch_size: int = 8,
buffer_limit: int = 0,
buffer_cpu_offload: bool = True,
eps_clip: float = 0.2,
value_clip: float = 0.4,
experience_batch_size: int = 8,
max_epochs: int = 1,
tokenizer: Optional[Callable[[Any], dict]] = None,
sample_replay_buffer: bool = False,
dataloader_pin_memory: bool = True,
callbacks: List[Callback] = [],
**generate_kwargs) -> None:
experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef)
replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload)
generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor)
super().__init__(strategy, experience_maker, replay_buffer, experience_batch_size, max_epochs, tokenizer,
sample_replay_buffer, dataloader_pin_memory, callbacks, **generate_kwargs)
self.actor = actor
self.critic = critic
self.actor_loss_fn = PolicyLoss(eps_clip)
self.critic_loss_fn = ValueLoss(value_clip)
self.ptx_loss_fn = nn.CrossEntropyLoss(ignore_index=-100)
self.ptx_coef = ptx_coef
self.actor_optim = actor_optim
self.critic_optim = critic_optim
'''
该方法根据经验(Experience)对象计算执行者(actor)和评论者(critic)的损失
并使用策略进行反向传播和优化器更新
'''
def training_step(self, experience: Experience) -> Dict[str, float]:
self.actor.train()
self.critic.train()
# policy loss
num_actions = experience.action_mask.size(1)
action_log_probs = self.actor(experience.sequences, num_actions, attention_mask=experience.attention_mask)
actor_loss = self.actor_loss_fn(action_log_probs,
experience.action_log_probs,
experience.advantages,
action_mask=experience.action_mask)
# ptx loss
if self.ptx_coef != 0:
ptx = next(iter(self.pretrain_dataloader))['input_ids'].to(torch.cuda.current_device())
label = next(iter(self.pretrain_dataloader))['labels'].to(torch.cuda.current_device())[:, 1:]
attention_mask = next(iter(self.pretrain_dataloader))['attention_mask'].to(torch.cuda.current_device())
ptx_log_probs = self.actor.get_base_model()(ptx, attention_mask=attention_mask)['logits'][..., :-1, :]
ptx_loss = self.ptx_loss_fn(ptx_log_probs.view(-1, ptx_log_probs.size(-1)), label.view(-1))
actor_loss = ptx_loss * self.ptx_coef + actor_loss * (1 - self.ptx_coef)
self.strategy.backward(actor_loss, self.actor, self.actor_optim)
self.strategy.optimizer_step(self.actor_optim)
self.actor_optim.zero_grad()
# value loss
values = self.critic(experience.sequences,
action_mask=experience.action_mask,
attention_mask=experience.attention_mask)
critic_loss = self.critic_loss_fn(values,
experience.values,
experience.reward,
action_mask=experience.action_mask)
self.strategy.backward(critic_loss, self.critic, self.critic_optim)
self.strategy.optimizer_step(self.critic_optim)
self.critic_optim.zero_grad()
return {'reward': experience.reward.mean().item()}
def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> None:
origin_model = strategy._unwrap_actor(actor)
new_kwargs = {**generate_kwargs}
# use huggingface models method directly
if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'):
new_kwargs['prepare_inputs_fn'] = origin_model.prepare_inputs_for_generation
if 'update_model_kwargs_fn' not in generate_kwargs:
new_kwargs['update_model_kwargs_fn'] = update_model_kwargs_fn
return new_kwargs
def save_model(self, path: str, only_rank0: bool = False, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0, tokenizer=tokenizer)
在获得最终模型权重后,还可通过量化降低推理硬件成本,并启动在线推理服务,仅需单张约 4GB 显存的 GPU 即可完成 70 亿参数模型推理服务部署
更多推荐
类ChatGPT逐行代码解读(2/2):从零起步实现ChatLLaMA和ColossalChat
发布评论