类ChatGPT逐行代码解读(2/2):从零起步实现ChatLLaMA和ColossalChat

编程知识 更新时间:2023-05-02 22:35:34

本文为《类ChatGPT逐行代码解读》系列的第二篇,上一篇是:如何从零起步实现Transformer、ChatGLM

本文两个模型的特点是加了RLHF


第六部分 LLaMA的RLHF版:ChatLLaMA(英文版)

由于LLaMA没有使用RLHF方法,初创公司 Nebuly AI开源了RLHF版的LLaMA,即ChatLLaMA

6.1 三套数据集:分别训练actor、reward、rlhf

其训练过程类似 ChatGPT,而通过本博客内的《ChatGPT技术原理解析》3.1节,可知训练三个模型(SFT、RM、RL/PPO)得先准备三套数据集

6.1.1 actor_training_data,即用于微调GPT3所用的数据

actor_training_data,即用于微调GPT3所用的数据,比如
[
  {
      "user_input": "here the input of the user",
      "completion": "here the model completion"
  }
]

actor_training_data如何而来呢,有4项途径

  1. 使用 100% 合成数据,可以通过运行以下命令综合生成数据集:
    python artifacts/generate_actor_dataset.py,注:此命令需要订阅OpenAI,生成完整数据集的davinci-003成本约为 200 美元(当然 也有免费的途径)
  2. 使用具有辅助交互的开源数据集之一,目前支持:

    Anthropic HH RLHF:这个数据集由结构化的 {question/answer pairs} 组成,包括机器人选择和拒绝的答案;
    Stanford Human Preferences Dataset (SHP):这个数据集是从选定的“提问”subreddits 中挑选出来的,并且包括基于最受支持的回答的范围广泛的 {question/answer pairs} 的问题
    可以运行以下命令下载数据集:

    python artifacts/download_dataset.py <dataset_name> --path <path_to_folder_for_download> --number_of_samples <N>

    其中:
    <dataset_name>对于 StanfordNLP/SHP 数据集,可以是“SHP”或“ARLHF”,对于 Anthropic/hh-rlhf 数据集,可以分别是“SHP”或“ARLHF”;
    <path_to_folder_for_download>是要创建数据集的文件夹路径;
    <N>是组成 reward_dataset.json 的样本数

  3. 使用 100% 个性化数据集
    用户提供自己的个性化完整数据集,数据集必须是具有以下格式的 JSON 文件:
    [
        {
            "user_input": "here the input of the user",
            "completion": "here the model completion"
        }
    ]

    其中列表包含多个dictionaries,每个dictionary 对应一个数据样本,建议使用超过 1000 个数据样本来进行对actor的训练

  4. 创建完整的数据集,增加一些自定义数据样本,数据集可以从用户提供的一些提示+响应示例中综合生成(少数 => 10)

6.1.2 reward_training_data,用于训练一个奖励模型的数据

reward_training_data,用于训练一个奖励模型的数据,包含三部分的数据: 
i) prompts,
ii) completion
iii) score of the completion assigned accordingly to the user feedback (the Human Feedback in RLHF,即对各个回答的评分score)

示例如下
[{
    "user_input": "...",
    "completion": "...",
    "score": 1
},
    ...
]


同样的,奖励数据怎么来呢?有以下三种方式

  1. be synthetically scored using a LLM as Human Feedback
    LLM 模型用于为每个entry计算分数
    为此,LLM 需要一个提示模板,其中包含评估生成的文本的所有说明(比如奖励规则,什么情况下该奖 什么情况下不奖都得十分明确)。为此,您应该将key reward添加到文件中templates.json,比如:

    {

        "reward": "Here is the template for the reward model. The rules are:\n\n1.Rule 1\n\n2. Rule 2"
    }

    如果未提供模板,则使用默认模板artifacts/generate_rewards.py,注:所有模板都必须保存在一个名为 .json 的 JSON 文件中templates.json

    获得unlabelled dataset后,您可以通过运行以下命令生成分数:

    python artifacts/generate_rewards.py <dataset_path> --model <model_to_use> --temperature <t> --max_tokens <n> --reward_template <path_to_file.json>

    其中,<dataset_path>要评分的reward dataset的路径;
    <model_to_use>用于奖励的模型,默认建议使用text-davinci-003
    <temperature>用于对模型进行评分的temperature,temperature =0.1;
    <max_tokens>
    <reward_template>,这是包含用于生成奖励的模板的文件的路径,如果未提供路径,将使用默认模版

    这里值得注意的是,与instructGPT中的「人类通过对模型的输出进行排序,然后利用这些排序数据去训练一个RM」不同,ChatLLaMA直接训练一个RM对模型的输出进行打分 比如0-5分,且与人类的打分做MSE损失(减少RM打分与人类打分之间的差距)

        REWARD_TEMPLATE = dict(
            template=(
                "You have to evaluate the following chat with a score"
                "between 0 and 5"

    最后,可能你会问,从哪里看出来的用的MSE损失,答案是从另外一个文件里看出来的(具体是chatllama/rlhf/reward.py 文件的第282行) 

    class RewardTrainer:
        """Class to train the reward model
        def __init__(self, config: ConfigReward) -> None:
    
            # save the config
            self.config = config
    
            # load the model
            self.reward = RewardModel(config)
    
            # optimizer
            self.optimizer = torch.optim.AdamW(
                self.reward.parameters(), lr=config.lr
            )
    
            # loss function
            self.loss_function = torch.nn.MSELoss()
    
            // ...
  2. 用户提供他们个性化的完整数据集(至少需要 100 个数据样本),但数据集必须是以下格式的 JSON 文件,取名为:reward_training_data.json

    [
        {
            "user_input": "here type the user input",
            "completion": "here type the completion",
            "score": 4.0
        },
        {
            "user_input": "here type the user input",
            "completion": "random garbage",
            "score": 0.0
        }
    ]
  3. 用户提供的少量示例和使用 LLM 综合扩展的数据集(通过self-instruct的方式提示LLM产生更多所需要的指令数据)

6.1.3 rlhf_training_data,通过RL方法不断优化迭代最优策略的数据

It can be provided in 2 different ways:

  • Few examples provided by the user and dataset synthetically expanded using LLM(依然可以

    继续通过self-instruct的方式提示LLM产生更多所需要的指令数据)
    需要将key rlhf添加到templates.json文件中,其中包含有关要执行的任务的信息以及 LLM 生成所需的额外上下文,这是模板的示例(所有模板必须保存在一个名为templates.json):

    {
      "rlhf": "Here is the template for the generating RLHF prompts. The task we want to perform is ..."
    }

  • The user provides the full dataset with possible interactions with the model

    数据集需要包含超过 1000 个提示示例(文件命名为rlhf_training_data.json):

    [
        {
            "user_input": "here the example of user input"
        }
    ]

6.2 训练流程:SFT、RM、RL/PPO训练三步骤

6.2.1 RewardTrainer 类的 train 方法训练一个奖励函数

chatllama/rlhf/reward.py中

首先定义了一个名为 Reward Model 的类,作为奖励模型或批评者模型(Critic Model)。Reward Model 是一个基于语言模型的模型,附加了一个头部head,用于预测给定的 token 序列的奖励(一个标量值),最后将CriticModel类设置为RewardModel类,以保持命名一致性

之后,定义类:RewardDatase用于训练奖励模型的数据集
RewardDataset 类是一个继承自 Dataset 的自定义数据集类,它的作用是从给定的 JSON 文件中读取数据,并将数据整理成适当的格式。JSON 文件应包含以下格式的数据:

class RewardDataset(Dataset):
    """Dataset class for the reward model
    read a json file with the following format:
    [
        {
            "user_input": "...",
            "completion": "...",
            "score": ...
        },
        ...
    ]
    Where:
        user_input: the initial input of the user
        completion: the completion generated by the model
        score: the score given by the user to the completion (or by the LLM)
    """

其中 user_input 是用户的初始输入,completion 是模型生成的补全,而 score 是用户或LLM给予补全的分数

再定义一个RewardTrainer 类用于训练奖励模型,它初始化奖励模型、优化器、损失函数(具体如上文所说,或如282行所述的MSE损失函数)、数据集和数据加载器等。此外,它还支持使用 DeepSpeed 或 Accelerate(两种高性能深度学习训练框架)进行训练

RewardTrainer 类的主要方法有:
train:训练奖励模型。它执行训练循环,包括前向传播、计算损失、反向传播和优化器更新。在每个周期结束时,它还可以对模型进行验证(如果提供了验证数据集的话)

  • 首先是初始化
    # 定义构造函数
    def __init__(self, config: ConfigReward) -> None:
    
        # 保存配置对象
        self.config = config
    
        # 加载模型
        self.reward = RewardModel(config)
    
        # 创建优化器
        self.optimizer = torch.optim.AdamW(
            self.reward.parameters(), lr=config.lr
        )
    
        # 定义损失函数,用的交叉熵损失
        self.loss_function = torch.nn.MSELoss()
    
        # 检查验证数据集是否存在
        self.validation_flag = False
        if config.validation_dataset_path is not None:
            self.validation_flag = True
    
        # 创建数据集和数据加载器
        self.train_dataset = RewardDataset(config.train_dataset_path)
        self.train_dataloader = DataLoader(
            self.train_dataset, batch_size=config.batch_size
        )
        # 如果有验证数据集,则创建验证数据集和数据加载器
        if self.validation_flag:
            self.eval_dataset = RewardDataset(config.validation_dataset_path)
            self.validation_dataloader = DataLoader(
                self.eval_dataset, batch_size=config.batch_size
            )
    
        # 初始化学习率调度器 - 学习率将下降到初始值的10%
        self.scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
            self.optimizer,
            T_0=len(self.train_dataset) // config.batch_size,
            T_mult=1,
            eta_min=config.lr * 0.1,
            last_epoch=-1,
        )
  • save_checkpoint:保存模型的检查点。在训练过程中,可以定期将模型的当前状态(包括模型参数、优化器状态和训练统计信息)保存为检查点,以便在需要时恢复训练
  • load_checkpoint:从检查点恢复模型。如果在训练过程中找到检查点文件,则该方法将从检查点恢复模型状态,并返回从何处恢复训练的周期和步骤
  • 接下来,是具体的训练过程
    def train(
            self,
        ) -> None:
            # 训练奖励模型
            # # 打印开始训练奖励模型的消息
            print("Start Training the Reward Model") 
    
            # 如果启用了 DeepSpeed,从训练数据加载器获取批次大小
            if self.config.deepspeed_enable: 
                batch_size = self.train_dataloader.batch_size 
            else: # 否则
                batch_size = self.config.batch_size         # 从配置获取批次大小
            epochs = self.config.epochs                     # 从配置获取训练轮次
            device = self.config.device                     # 从配置获取设备
    
            # 从配置获取每次打印的迭代次数
            iteration_per_print = self.config.iteration_per_print 
            # 从配置获取检查点步骤
            checkpoint_steps = self.config.checkpoint_steps       
    
            # 计算训练数据集的迭代次数
            n_iter = int(len(self.train_dataset) / batch_size)  
            # 加载检查点并获取开始轮次和开始步骤
            start_epoch, start_step = self.load_checkpoint()    
    
            # 初始化检查点计数器
            cnt_checkpoints = 1                      
    
            # 训练循环
            for epoch in range(start_epoch, epochs): # 对于每个轮次
                self.reward.train()                  # 将奖励模型设置为训练模式
                # 遍历训练数据加载器中的每个输入
                for i,inputs in enumerate(self.train_dataloader): 
    
                # 如果从检查点恢复,则跳过步骤
                if i < start_step:
                    continue
    
                # 获取输入
                input_text = inputs[0]      # 获取输入文本
                score = inputs[1]           # 获取分数
    
                # 对输入进行分词
                with torch.no_grad():       # 禁用梯度计算
                    input_tokens = self.reward.tokenizer(
                        input_text,
                        return_tensors="pt",
                        truncation=True,
                        padding=True,
                    )  # 对输入文本进行分词
                    output = torch.as_tensor(
                        score, dtype=torch.float32, device=device
                    )  # 将分数转换为张量
    
                # 前向传播
                if self.config.deepspeed_enable:      # 如果启用了 DeepSpeed
                    est_output = self.model_engine(
                        input_tokens["input_ids"].to(device),
                        input_tokens["attention_mask"].to(device),
                    )[:, -1]          # 使用模型引擎进行前向传播
                else:  # 否则
                    est_output = self.reward.get_reward(
                        input_tokens["input_ids"].to(device),
                        input_tokens["attention_mask"].to(device),
                    )                 # 使用奖励模型进行前向传播
    
                # 计算损失函数
                loss = self.loss_function(est_output, output)        
                # 将损失添加到训练统计数据中  
                self.training_stats.training_loss.append(loss.item())  
    
                # 反向传播
                if self.config.deepspeed_enable:      # 如果启用了 DeepSpeed
                    self.model_engine.backward(loss)  # 使用模型引擎进行反向传播
                    self.model_engine.step()  # 更新模型参数
                elif self.config.accelerate_enable:   # 如果启用了加速
                    self.optimizer.zero_grad()        # 将优化器的梯度归零
                    self.accelerator.backward(loss)   # 使用加速器进行反向传播
                    self.optimizer.step()       # 更新模型参数
                    self.scheduler.step()       # 更新学习率调度器
                else:  # 否则
                    self.optimizer.zero_grad()  # 将优化器的梯度归零
                    loss.backward()             # 进行反向传播
                    self.optimizer.step()       # 更新模型参数
                    self.scheduler.step()       # 更新学习率调度器
    
                # 打印进度,如果当前迭代次数是打印间隔的整数倍
                if i % iteration_per_print == 0:  
                    print(
                        f"Epoch: {epoch+1}/{epochs}, "
                        f"Iteration: {i+1}/{n_iter}, "
                        f"Training Loss: {loss.item()}"
                    )  # 打印训练进度
                    printed_est_output = [
                        round(float(x), 1) for x in est_output.cpu().tolist()
                    ]          # 对估计输出进行四舍五入
                    print(
                        "prediction",
                        printed_est_output,
                        "target",
                        score.cpu().tolist(),
                    )          # 打印预测值和目标值
    
                # 保存检查点,如果检查点计数器是检查点步骤的整数倍
                if cnt_checkpoints % checkpoint_steps == 0:      
                    self.save_checkpoint(epoch, i, epochs, n_iter)  # 保存检查点
                    cnt_checkpoints = 1      # 重置检查点计数器
                else:  # 否则
                    cnt_checkpoints += 1    # 检查点计数器加一
    
            # 验证
            if self.validation_flag:        # 如果启用了验证
                self.reward.eval()          # 将奖励模型设置为评估模式
                with torch.no_grad():       # 禁用梯度计算
                    for i, (text, score) in enumerate(
                        self.validation_dataloader
                    ):  # 遍历验证数据加载器中的每个输入
    
                        # 对输入进行分词
                        input_tokens = self.reward.tokenizer(
                            text, return_tensors="pt", padding=True
                        )  # 对输入文本进行分词
                        input_tokens = input_tokens.to(device)  # 将输入令牌移动到设备上
                        # TODO: 检查输入令牌的长度,如果过长可能会导致问题
                        output = torch.tensor(score, dtype=torch.float32).to(
                            device
                        )  # 将分数转换为张量并移动到设备上
    
                        # 前向传播
                        est_output = self.reward.get_reward(
                            input_tokens["input_ids"],
                            input_tokens["attention_mask"],
                        )  # 使用奖励模型进行前向传播
    
                        # 计算损失函数
                        loss = self.loss_function(est_output, output)      
                        # 将损失添加到训练统计数据中
                        self.training_stats.validation_loss.append(loss.item())  
    
                        # 打印进度,如果当前迭代次数是打印间隔的整数倍
                        if i % iteration_per_print == 0:  
                            print(
                                f"Epoch: {epoch+1}/{epochs}, "
                                f"Iteration: {i+1}/{n_iter}, "
                                f"Validation Loss: {loss.item()}"
                            )  # 打印验证进度
    
            # 在恢复训练后重置 start_step
            start_step = 0
    
        # 训练结束后保存模型
        self.reward.save()      # 保存奖励模型

总之,在 RewardTrainer 类的 train 方法中
首先会尝试从检查点恢复模型(如果有的话);
然后,它会遍历数据加载器中的所有输入,对每个输入执行前向传播、计算损失、反向传播和优化器更新。在每个周期结束时,如果提供了验证数据集,还会对模型进行验证;
最后,在训练完成后,将保存模型

6.2.2 通过chatllama/rlhf/actor.py再训练一个actor

此外,项目通过chatllama/rlhf/actor.py再训练一个actor,比如通过train方法训练一个基于transformer的模型,它包括了数据处理、模型训练、验证和模型保存等操作

  1. 定义train方法,它没有返回值。
  2. 打印训练开始信息。
  3. 获取配置参数,包括批量大小、训练轮数、设备和检查点步数。
  4. 计算迭代次数。
  5. 加载模型检查点并获取开始的轮数和步数。
  6. 如果从头开始训练,清空训练统计。
  7. 初始化检查点计数器。
  8. 定义训练循环,其中包括:
    • 设置模型为训练模式。
    • 遍历训练数据加载器。
    • 如果恢复训练,跳过已经完成的步数。
    • 对输入文本进行标记化处理。
    • 将输入文本分割成tokens和mask。
    • 添加结束符(EOS)。
    • 将输入文本分割成输入和输出。
    • 将输入文本移至设备。
    • 执行前向传播。
    • 计算损失。
    • 执行反向传播和优化。
    • 打印训练进度。
    • 定期保存检查点和训练统计。
  9. 进行验证(如果启用验证的话):
  10. 设置模型为评估模式。
  11. 使用torch.no_grad()禁用梯度计算。
  12. 遍历验证数据加载器。
  13. 对输入文本进行标记化处理。
  14. 将输入文本分割成验证输入和输出。
  15. 执行前向传播。
  16. 计算损失。
  17. 更新验证损失统计。
  18. 打印验证进度。
  19. 在恢复训练后,将start_step重置为0。
  20. 训练完成后,保存模型。
  21. 打印训练结束信息

6.2.3 通过PPO算法优化强化学习任务中的策略(actor)和价值(critic)网络

有了奖励函数和actor,便可以通过PPO算法优化强化学习任务中的策略(actor)和价值(critic)网络,具体如下图,设置内外两个循环

  • 外层循环迭代训练轮次(epochs)
  • 内层循环遍历数据加载器(dataloader)中的批次(batches),在每次迭代中,它会处理一批数据,包括状态、动作、价值等,这些数据用于训练智能体-评论家模型

在内层循环中依次做如下处理(以下代码来源于:chatllama/chatllama/rlhf/trainer.py ):

首先是导入必须的库和模块,当然,主要是ActorCritic类

  1. 导入所需的库和模块。
  2. change_tokenization函数:用于在两个不同的分词器之间转换给定的tokens。
  3. check_model_family函数:检查两个配置是否属于相同的模型家族。
  4. ActorCritic类:包含了actor和critic模型,并用于在训练actor过程中为给定的序列生成动作和值。它包括以下方法:
    • __init__:初始化actor和critic模型
          def __init__(self, config: Config) -> None:
              super().__init__()
              self.config = config
      
              self.actor = ActorModel(config.actor)
      
              # check if critic must be initialized from reward model
              ModelLoader.init_critic_from_reward(config.critic)
              self.critic = CriticModel(config.critic)
      
              # if the actor and critic use the same tokenizer is set to True
              self.use_same_tokenizer = False
      
              # debug flag
              self.debug = config.actor.
    • load:加载模型,但未实现。
    • save:将模型保存到路径。
    • forward:基于给定的整个序列,使用actor的forward方法获取序列中每个token的logits,并使用critic的forward方法获取每个生成步骤的值。

这个代码主要用于强化学习训练自然语言生成模型。ActorCritic类是其中的核心部分,它包含了actor和critic模型。这两个模型在训练过程中相互协作,用于生成动作和值。

其次,主要是关于一个用于生成动作、动作逻辑、价值和序列的生成函数,以及用于存储训练数据和生成训练示例的类

  1. 首先定义了一个名为generate的函数,它使用了@torch.no_grad()和@beartype修饰器。这个函数接收四个参数,分别是states_actor、states_mask_actor和states_critic,并返回一个元组
    这个函数的主要目的是从输入状态生成动作、动作逻辑、价值和序列。它首先从actor模型生成动作序列,然后创建一个用于actor序列的mask。接下来,它检查是否需要为critic使用不同的编码。如果需要,它将使用change_tokenization函数来更改序列的编码。接着,它生成动作逻辑和价值。如果处于调试模式,将打印一些调试信息。
  2. 接下来,代码定义了一个名为Memory的namedtuple,用于存储每个经验的数据。Memory包含了11个字段,如states_actor、actions、values等。
  3. 然后定义了一个名为ExperienceDataset的类,它继承自torch.utils.data.Dataset。这个类用于训练actor-critic模型。它接收一个memories参数和一个device参数。memories参数是一个包含Memory实例的双端队列。device参数表示要在哪个设备上进行计算。这个类实现了__len__和__getitem__方法,使其可以像普通的PyTorch数据集一样使用。
  4. 最后,定义了一个名为ExamplesSampler的类,用于从JSON文件中读取示例并在需要时抽样。这个类接收一个表示文件路径的path参数。在初始化时,它从文件中读取数据,并将其存储在self.data中。它还实现了一个名为sample的方法,用于从数据中抽取指定数量的示例。

再之后,定义了一个名为 RLTrainer 的类,用于使用强化学习训练一个Actor-Critic模型。该类具有多个属性和方法,用于训练过程中的各种操作。

  • __init__ 方法中,初始化了训练器的各个组件,包括Actor-Critic模型、actor和critic优化器、reward模型、用于存储训练统计数据和对话记录的类、以及示例采样器
  • save_checkpoint 方法保存了当前状态的Actor-Critic模型的检查点,包括当前的训练轮数、actor和critic模型的状态字典,以及它们各自的优化器的状态字典。
  • load_checkpoint 方法加载了Actor-Critic模型的检查点,包括训练轮数、actor和critic模型的状态字典,以及它们各自的优化器的状态字典。如果没有找到检查点,则返回轮数0。如果actor和critic的检查点存在差异,则从两者中最小的轮数开始训练。

再之后,调用 learn 方法更新actor和critic模型,并保存训练统计数据和对话记录

  1. 使用智能体-评论家模型计算新的动作概率和价值
                    # get actor critic new probabilities and values
                    actions_logits, values = self.actorcritic.forward(
                        sequences_actor,
                        sequences_mask_actor,
                        sequences_critic,
                        sequences_mask_critic,
                        action_len_actor.item(),
                        action_len_critic.item(),
                    )
  2. 计算动作的对数概率、熵和KL散度损失
                    # get action log prob
                    actions_prob = (
                        torch.softmax(actions_logits, dim=-1).max(dim=-1).values
                    )
                    actions_log_prob = torch.log(actions_prob + self.eps)
    
                    # compute entropy,一般表示为-sum「p(x)logp(x)」
                    entropies = (actions_prob * actions_log_prob).sum(dim=-1)
    
                    # compute KL divergence,一般表示为:-sum「p(x)log q(x)/p(x)」
                    kl_div_loss = (
                        (actions_prob * (old_actions_log_probs - actions_log_prob))
                        .sum(dim=-1)
                        .mean()
                    )
  3. 计算重要性权重比率(ratios),即新旧策略的概率比
                    # compute ratios
                    ratios = (actions_log_prob - old_actions_log_probs).exp()
  4. 计算PPO损失,包括优势函数的计算和PPO-clip算法的应用
    首先我们回顾下强化学习极简入门一文里对『近端策略优化裁剪PPO-clip』的阐述

    简单实现的话,即是
    # ratios即为重要性权重,exp代表求期望,括号里的environment_log_probs代表用于与环境交互的策略
    ratios = torch.exp(log_probs - environment_log_probs)
    
    # 分别用sur_1、sur_2来计算公式的两部分
    # 第一部分是重要性权重乘以优势函数
    sur_1 = ratios * advs
     
    # 第二部分是具体的裁剪过程
    sur_2 = torch.clamp(ratios, 1 - clip_eps, 1 + clip_eps) * advs
     
    # 最终看谁更小则取谁
    clip_loss = -torch.min(sur_1,sur_2).mean()
    更具体的实现,则可以如下所示
                    # 计算PPO总损失
                    if check_model_family(self.config.actor, self.config.critic):
                        # 计算 TRL 中的折扣回报
                        gamma = self.config.trainer.gamma_discounted
    
                        # 初始化折扣回报矩阵
                        discounted_rewards = torch.zeros_like(old_values)
    
                        # 遍历每个时间步
                        for i in range(discounted_rewards.shape[1]):
                            for j in range(i, discounted_rewards.shape[1]):
                                # 计算折扣回报
                                discounted_rewards[:, i] += (
                                    gamma ** (j - i) * rewards[:, j]
                                )
    
                        # 计算优势值,与TRL 中旧值的符号相反
                        advantages = (
                            discounted_rewards - old_values
                        )
    
                        # normalize advantages
                        advantages = (advantages - advantages.mean(dim=-1)) / (
                            advantages.std() + self.eps
                        )
    
                        surr1 = advantages * ratios
                    else:
                        advantages = rewards - old_values[:, -1]
                        surr1 = advantages * ratios
    
                    surr2 = (
                        torch.clamp(ratios, 1 - actor_eps_clip, 1 + actor_eps_clip)
                        * advantages
                    )
  5. 计算策略损失和总损失
                    policy_loss = -torch.min(surr1, surr2) - beta_s * entropies
                    policy_loss = policy_loss.mean()
                    loss = policy_loss + kl_div_loss
    可能有读者看到这里 看迷糊了,即咋出来两个损失函数了,看起来是一个策略损失,一个KL散度损失,其实你仔细一看上面的代码,你会发现本项目中的PPO 算法引入的这两个损失函数:
      一个 policy loss,本质是一个目标函数(具体用的近端策略优化裁剪PPO-clip与熵的差值)

    其中,表示在当前策略下采样得到的经验的无偏估计, 是策略比率,是优势函数, 是超参数,用于控制策略更新的幅度,是熵的系数
      另一个 KL散度损失(kl_div_loss),用于限制新旧 policy 之间的差异,以免更新太快,导致学习不稳定

    最终,总的损失函数为:

    其中, 是超参数,用于控制 KL 散度损失的权重

  6. 如果损失值为NaN,抛出异常
                    # check if loss item is NaN
                    if torch.isnan(loss):
                        raise ValueError("Loss is nan")
  7. 更新策略,包括使用DeepSpeed或Accelerate库进行优化
                    # 按照损失更新 actor 模型参数
                    # 使用 DeepSpeed 的 engine 对 loss 进行反向传播
                    if self.config.actor.deepspeed_enable:
                        actor_model_engine.backward(loss)
                        actor_model_engine.step()
    
                    # 如果启用了 PyTorch 的 Accelerate
                    elif self.config.actor.accelerate_enable:
                        # 将 actor 模型参数的梯度清零
                        self.actor_optimizer.zero_grad()
    
                        # 使用 Accelerate 对 loss 进行反向传播
                        actor_accelerator.backward(loss)
    
                        # 使用 PyTorch 的优化器更新 actor 模型参数
                        self.actor_optimizer.step()
    
                        # 使用 PyTorch 的学习率调度器更新学习率
                        self.actor_scheduler.step()
                    else:
                        self.actor_optimizer.zero_grad()
    
                        # 对 loss 进行反向传播
                        loss.backward()
                        self.actor_optimizer.step()
                        self.actor_scheduler.step()
    
  8. 计算价值损失
                    # compute value loss
                    # 损失是奖励与值之间的距离
                    # 我希望这个距离尽可能小,以便值能够代表奖励,因此我在两者之间取最大值
                    # 裁剪限制了值损失剪辑的变化速率
                    value_loss_clipped = old_values + (values - old_values).clamp(
                        -critic_eps_clip, critic_eps_clip
                    )
    
                    # 计算第一种值损失,即裁剪后的值与奖励之间的平方差
                    value_loss1 = (value_loss_clipped - rewards) ** 2
    
                    # 计算第二种值损失,即未裁剪的值与奖励之间的平方差
                    value_loss2 = (values - rewards) ** 2
    
                    # 选择两种值损失中较大的那个,并计算其均值
                    value_loss = torch.max(value_loss1, value_loss2).mean()
  9. 如果价值损失为NaN,抛出异常
                    if torch.isnan(value_loss):
                        raise ValueError("Value loss is nan")
  10. 更新评论家,包括使用DeepSpeed或Accelerate库进行优化
                    # upate critic
                    if self.config.critic.deepspeed_enable:
                        critic_model_engine.backward(value_loss)
                        critic_model_engine.step()
                    elif self.config.critic.accelerate_enable:
                        self.critic_optimizer.zero_grad()
                        critic_accelerator.backward(loss)
                        self.critic_optimizer.step()
                        self.critic_scheduler.step()
                    else:
                        self.critic_optimizer.zero_grad()
                        value_loss.backward()
                        self.critic_optimizer.step()
                        self.critic_scheduler.step()
  11. 将损失值追加到训练统计信息中
                    # 将训练损失值添加到训练统计信息中
                    self.training_stats.training_loss.append(
                        # 将损失值从计算图中分离,移动到 CPU 上,并转换为 Python 数值类型
                        loss.detach().cpu().item()
                    )
    
                    # 将价值损失值添加到训练统计信息中
                    self.training_stats.value_loss.append(
                        # 将价值损失值从计算图中分离,移动到 CPU 上,并转换为 Python 数值类型
                        value_loss.detach().cpu().item()
                    )
  12. 输出迭代信息
                    # print iteration info
                    print(
                        f"Epoch {epoch+1}/{epochs}",
                        f"Step {k+1}/{int(len(dataloader) / batch_size)}",
                        f"Loss {loss.detach().cpu().item():.4f}",
                        f"Value Loss {value_loss.detach().cpu().item():.4f}",
                    )
  13. 训练循环结束后,将智能体-评论家模型设为评估模式并输出训练结束信息
            self.actorcritic.eval()
            print("End Learning")

最后的最后,定义了一个 train() 方法,使用 actor-critic 算法训练强化学习模型。方法首先初始化各种设置,如训练的总 episode 数量、每个 episode 的最大步数、批次大小和训练设备等。然后检查要用于学习的记忆数量是否是批次大小的倍数,以及总步数是否是更新步数的倍数。

该方法初始化记忆,加载检查点(如果有的话),如果是从头开始的新训练,则清除会话记录。然后循环遍历 episode 和 timestep,从示例数据集中抽取样本,为 actor 和 critic 进行分词,生成动作和值的序列,计算动作日志概率,计算奖励。存储每个 episode/timestep 的记忆,并将完成(解码后的动作)记录在会话日志中。

在一定数量的 timestep 后,使用记忆进行学习,并计算平均奖励。该过程重复进行,直到训练完成。该方法在训练结束时保存模型和会话日志。


第七部分 ColossalChat:通过self-instruct技术指令微调LLaMA且加上RLHF

7.1 技术架构:通过self-instruct生成的中英双语数据集 + 三阶段训练方式

据介绍(介绍页面,该页面的翻译之一,代码地址),Colossal-AI 开源了基于 LLaMA-7B 模型的包含完整 RLHF 流程的类 Chat 模型复现方案 ColossalChat

7.1.1 针对社交平台的种子数据且利用self-instruct 技术生成中英双语数据集

ColossalChat 收集并清洗了社交平台上人们的真实提问场景作为种子数据集,然后利用 self-instruct 技术扩充数据(通过prompt OpenAI API,花费约 900 美元进行标注),最终生成了10.4万条问答的中、英双语数据集(这是数据的开源地址)
他们的说法是,对比其他 self-instruct 方法生成的数据集,该数据集的种子数据更加真实、丰富,生成的数据集涵盖的话题更多,该数据可以同时用于微调和 RLHF 训练,通过高质量的数据,ColossalChat 能进行更好地对话交互,同时支持中文

7.1.2​ ColossalChat训练方式:类似instructGPT/ChatGPT的训练三步骤

关于训练方式:类似instructGPT/ChatGPT的训练三步骤(如果忘了,务必复习下此文的3.1节)

  • Stage1 是supervised-fintuning,即使用上文提到的数据集进行监督微调
  • Stage2 训练一个奖励模型(初始化为阶段1的SFT模型),它通过模型对于同一个 prompt 的不同输出进行人工排序,根据排序结果监督训练出一个奖励模型
  • Stage3 是通过阶段2训练出来的奖励函数微调出一个RL模型,微调过程中通过PPO算法限制RL模型的参数更新范围(以阶段1的SFT模型的策略为参考基准,PPO算法避免与基线模型SFT的策略偏离过远)

具体而言,为两个阶段进行:​

  • 如上图底部,首先是 Make Experience 部分,利用 SFT 、Actor、RM、Critic模型计算生成 Experience 存入 buffer 中;

    之后是参数更新部分,利用 Experience 计算价值损失(value loss)​和策略损失(policy loss),具体说明在此文的4.4.3节有介绍

  • 如上图顶部即是PTX 部分(上面的目标函数​中加在最后的偏置项)
    ColossalChat 计算 Actor 的现有输出response 和预训练语料的回答部分的交叉熵损失函数(calculates the cross-entropy loss between the Actor’s output response and the response part of the input corpus)
    用来在 PPO 梯度中加入预训练梯度(add pre-training gradients to the PPO gradient)
    以保持语言模型比如GPT2原有的核心性能(maintain the language model’s original performance and prevent forgetting),防止忘了最早从哪里出发的(GPT2 ​ SFT ​ RM ​ RLHF)
  • 最后将策略损失、价值损失和 PTX 损失加和(the policy loss, value loss, and PTX loss are summed up),进行反向传播和参数更新 

7.2 代码实现:SFT模型 + 奖励模型 + PPO training

先看下整体的代码架构图


接下来,我们看下一些关键实现

7.2.1 首先,训练一个SFT模型

首先通过ColossalAI/applications/Chat/coati/trainer/sft.py,训练一个SFT模型

import math
import time
from abc import ABC
from typing import Optional

import loralib as lora
import torch
import torch.distributed as dist
import wandb
from coati.models.loss import GPTLMLoss
from torch import nn
from torch.optim import Adam, Optimizer
from torch.optim.lr_scheduler import LambdaLR
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from transformers.trainer import get_scheduler

from colossalai.logging import get_dist_logger

from .strategies import Strategy
from .utils import is_rank_0


class SFTTrainer(ABC):
    """
        Trainer to use while training reward model.

    Args:
        model (torch.nn.Module): the model to train
        strategy (Strategy): the strategy to use for training
        optim(Optimizer): the optimizer to use for training
        train_dataloader: the dataloader to use for training
        eval_dataloader: the dataloader to use for evaluation
        batch_size (int, defaults to 1): the batch size while training
        max_epochs (int, defaults to 2): the number of epochs to train
        optim_kwargs (dict, defaults to {'lr':1e-4}): the kwargs to use while initializing optimizer
    """

    def __init__(
        self,
        model,
        strategy: Strategy,
        optim: Optimizer,
        train_dataloader: DataLoader,
        eval_dataloader: DataLoader = None,
        batch_size: int = 1,
        max_epochs: int = 2,
        accimulation_steps: int = 8,
    ) -> None:
        super().__init__()
        self.strategy = strategy
        self.epochs = max_epochs
        self.train_dataloader = train_dataloader
        self.eval_dataloader = eval_dataloader

        self.model = strategy.setup_model(model)
        if "DDP" in str(self.strategy):
            self.model = self.model.module
        self.optimizer = strategy.setup_optimizer(optim, self.model)

        self.accimulation_steps = accimulation_steps
        num_update_steps_per_epoch = len(train_dataloader) // self.accimulation_steps
        max_steps = math.ceil(self.epochs * num_update_steps_per_epoch)

        self.scheduler = get_scheduler("cosine",
                                       self.optimizer,
                                       num_warmup_steps=math.ceil(max_steps * 0.03),
                                       num_training_steps=max_steps)

    def fit(self, logger, log_interval=10):
        wandb.init(project="Coati", name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
        wandb.watch(self.model)
        total_loss = 0
        # epoch_bar = tqdm(range(self.epochs), desc='Epochs', disable=not is_rank_0())
        step_bar = tqdm(range(len(self.train_dataloader) // self.accimulation_steps * self.epochs),
                        desc=f'steps',
                        disable=not is_rank_0())
        for epoch in range(self.epochs):

            # process_bar = tqdm(range(len(self.train_dataloader)), desc=f'Train process for{epoch}', disable=not is_rank_0())
            # train
            self.model.train()
            for batch_id, batch in enumerate(self.train_dataloader):

                prompt_ids = batch["input_ids"].to(torch.cuda.current_device())
                p_mask = batch["attention_mask"].to(torch.cuda.current_device())
                labels = batch["labels"].to(torch.cuda.current_device())
                # prompt_ids = prompt_ids.squeeze(1).cuda()
                # p_mask = p_mask.squeeze(1).cuda()
                # prompt_logits = self.model(prompt_ids, attention_mask=p_mask, labels=labels)

                outputs = self.model(prompt_ids, attention_mask=p_mask, labels=labels)

                loss = outputs.loss
                prompt_logits = outputs.logits

                if loss >= 2.5:
                    logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}")

                loss = loss / self.accimulation_steps

                self.strategy.backward(loss, self.model, self.optimizer)

                total_loss += loss.item()

                # gradient accumulation
                if (batch_id + 1) % self.accimulation_steps == 0:
                    self.strategy.optimizer_step(self.optimizer)
                    self.optimizer.zero_grad()
                    self.scheduler.step()
                    wandb.log({
                        "loss": total_loss / self.accimulation_steps,
                        "lr": self.scheduler.get_last_lr()[0],
                        "epoch": epoch,
                        "batch_id": batch_id
                    })
                    total_loss = 0
                    step_bar.update()

                # if batch_id % log_interval == 0:
                # logger.info(f'Train Epoch {epoch}/{self.epochs} Batch {batch_id} Rank {dist.get_rank()} loss {loss.item()}')
                # wandb.log({"loss": loss.item()})

                # process_bar.update()

            # eval
            if self.eval_dataloader is not None:
                self.model.eval()
                with torch.no_grad():
                    loss_sum = 0
                    num_seen = 0
                    for batch in self.eval_dataloader:
                        prompt_ids = batch["input_ids"].to(torch.cuda.current_device())
                        p_mask = batch["attention_mask"].to(torch.cuda.current_device())
                        labels = batch["labels"].to(torch.cuda.current_device())
                        # prompt_ids = prompt_ids.squeeze(1).cuda()
                        # p_mask = p_mask.squeeze(1).cuda()

                        outputs = self.model(prompt_ids, attention_mask=p_mask, labels=labels)
                        loss = outputs.loss
                        # prompt_logits = outputs.logits

                        loss_sum += loss.item()
                        num_seen += prompt_ids.size(0)

                    loss_mean = loss_sum / num_seen
                    if dist.get_rank() == 0:
                        logger.info(f'Eval Epoch {epoch}/{self.epochs} loss {loss_mean}')

            # epoch_bar.update()

    def save_model(self,
                   path: str,
                   only_rank0: bool = False,
                   tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
        self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0, tokenizer=tokenizer)

7.2.2 训练一个奖励模型

其次,通过ColossalAI/applications/Chat/coati/trainer/rm.py 训练一个奖励模型

from abc import ABC
from datetime import datetime
from typing import Optional

import pandas as pd
import torch
import torch.distributed as dist
from torch.optim import Optimizer, lr_scheduler
from torch.utils.data import DataLoader, Dataset, DistributedSampler
from tqdm import tqdm
from transformers.tokenization_utils_base import PreTrainedTokenizerBase

from .strategies import Strategy
from .utils import is_rank_0


class RewardModelTrainer(ABC):
    """
        Trainer to use while training reward model.

    Args:
        这个类继承了 ABC 抽象基类。它接受以下参数:
        model:待训练的模型
        strategy:训练策略
        optim:优化器
        loss_fn:损失函数
        train_dataset:训练数据集
        valid_dataset:验证数据集
        eval_dataset:评估数据集
        batch_size:批次大小(默认为1)
        max_epochs:最大训练轮数(默认为2)
    """

    # 初始化 RewardModelTrainer 对象,配置模型、优化器、调度器等,并创建训练、验证和评估的数据加载器
    def __init__(
        self,
        model,
        strategy: Strategy,
        optim: Optimizer,
        loss_fn,
        train_dataset: Dataset,
        valid_dataset: Dataset,
        eval_dataset: Dataset,
        batch_size: int = 1,
        max_epochs: int = 1,
    ) -> None:
        super().__init__()
        self.strategy = strategy
        self.epochs = max_epochs
        train_sampler = None

        if dist.is_initialized() and dist.get_world_size() > 1:
            train_sampler = DistributedSampler(train_dataset, shuffle=True, seed=42, drop_last=True)
        self.train_dataloader = DataLoader(train_dataset,
                                           shuffle=(train_sampler is None),
                                           sampler=train_sampler,
                                           batch_size=batch_size)
        self.valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)
        self.eval_dataloader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=True)

        self.model = strategy.setup_model(model)
        self.loss_fn = loss_fn
        self.optimizer = strategy.setup_optimizer(optim, self.model)
        self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, self.train_dataloader.__len__() // 100)

    # 计算给定数据加载器上的准确率,计算选定奖励与拒绝奖励之间的平均距离
    def eval_acc(self, dataloader):
        dist = 0
        on = 0
        cnt = 0
        self.model.eval()
        with torch.no_grad():
            for chosen_ids, c_mask, reject_ids, r_mask in dataloader:
                chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())
                c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())
                reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())
                r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())
                chosen_reward = self.model(chosen_ids, attention_mask=c_mask)
                reject_reward = self.model(reject_ids, attention_mask=r_mask)
                for i in range(len(chosen_reward)):
                    cnt += 1
                    if chosen_reward[i] > reject_reward[i]:
                        on += 1
                dist += (chosen_reward - reject_reward).mean().item()
            dist_mean = dist / len(dataloader)
            acc = on / cnt
        self.model.train()
        return dist_mean, acc

    # 用于实际训练模型。在每个训练轮中,它会遍历 train_dataloader,计算损失并更新模型
    # 每 100 步,它会计算验证集上的距离和准确率,并将其记录到日志文件中
    # 同时,在每个训练轮结束时,计算评估集上的距离和准确率
    def fit(self):
        time = datetime.now()
        epoch_bar = tqdm(range(self.epochs), desc='Train epoch', disable=not is_rank_0())
        for epoch in range(self.epochs):
            step_bar = tqdm(range(self.train_dataloader.__len__()),
                            desc='Train step of epoch %d' % epoch,
                            disable=not is_rank_0())
            # train
            self.model.train()
            cnt = 0
            acc = 0
            dist = 0
            for chosen_ids, c_mask, reject_ids, r_mask in self.train_dataloader:
                chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())
                c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())
                reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())
                r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())
                chosen_reward = self.model(chosen_ids, attention_mask=c_mask)
                reject_reward = self.model(reject_ids, attention_mask=r_mask)
                loss = self.loss_fn(chosen_reward, reject_reward)
                self.strategy.backward(loss, self.model, self.optimizer)
                self.strategy.optimizer_step(self.optimizer)
                self.optimizer.zero_grad()
                cnt += 1
                if cnt == 100:
                    self.scheduler.step()
                    dist, acc = self.eval_acc(self.valid_dataloader)
                    cnt = 0
                    if is_rank_0():
                        log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]],
                                           columns=['step', 'loss', 'dist', 'acc'])
                        log.to_csv('log_%s.csv' % time, mode='a', header=False, index=False)
                step_bar.update()
                step_bar.set_postfix({'dist': dist, 'acc': acc})

            # eval
            dist, acc = self.eval_acc(self.eval_dataloader)
            if is_rank_0():
                log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]], columns=['step', 'loss', 'dist', 'acc'])
                log.to_csv('log.csv', mode='a', header=False, index=False)
            epoch_bar.update()
            step_bar.set_postfix({'dist': dist, 'acc': acc})
            step_bar.close()

    # 用于保存训练好的模型。可以选择仅在 rank 0 的进程上保存模型,并选择保存 tokenizer
    def save_model(self,
                   path: str,
                   only_rank0: bool = False,
                   tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
        self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0, tokenizer=tokenizer)

7.2.3 通过trainer/ppo.py to start PPO training

最后,通过ColossalAI/applications/Chat/coati/trainer/ppo.py to start PPO training

from typing import Any, Callable, Dict, List, Optional

import torch
import torch.nn as nn
from coati.experience_maker import Experience, NaiveExperienceMaker
from coati.models.base import Actor, Critic
from coati.models.generation_utils import update_model_kwargs_fn
from coati.models.loss import PolicyLoss, ValueLoss
from coati.replay_buffer import NaiveReplayBuffer
from torch.optim import Optimizer
from transformers.tokenization_utils_base import PreTrainedTokenizerBase

from .base import Trainer
from .callbacks import Callback
from .strategies import Strategy


class PPOTrainer(Trainer):
    """
        Trainer for PPO algorithm.

    Args:
        接受以下参数:
        strategy:用于训练的策略
        actor:PPO算法中的执行者(actor)模型
        critic:PPO算法中的评论者(critic)模型
        reward_model:用于计算句子奖励的奖励模型
        initial_model:用于生成参考对数值的初始模型,以限制actor的更新
        actor_optim:用于actor模型的优化器
        critic_optim:用于critic模型的优化器
        其他参数:控制训练过程的各种超参数,如kl系数、批次大小等
    """

    def __init__(self,
                 strategy: Strategy,
                 actor: Actor,
                 critic: Critic,
                 reward_model: nn.Module,
                 initial_model: Actor,
                 actor_optim: Optimizer,
                 critic_optim: Optimizer,
                 kl_coef: float = 0.1,
                 ptx_coef: float = 0.9,
                 train_batch_size: int = 8,
                 buffer_limit: int = 0,
                 buffer_cpu_offload: bool = True,
                 eps_clip: float = 0.2,
                 value_clip: float = 0.4,
                 experience_batch_size: int = 8,
                 max_epochs: int = 1,
                 tokenizer: Optional[Callable[[Any], dict]] = None,
                 sample_replay_buffer: bool = False,
                 dataloader_pin_memory: bool = True,
                 callbacks: List[Callback] = [],
                 **generate_kwargs) -> None:
        experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef)
        replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload)
        generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor)
        super().__init__(strategy, experience_maker, replay_buffer, experience_batch_size, max_epochs, tokenizer,
                         sample_replay_buffer, dataloader_pin_memory, callbacks, **generate_kwargs)
        self.actor = actor
        self.critic = critic

        self.actor_loss_fn = PolicyLoss(eps_clip)
        self.critic_loss_fn = ValueLoss(value_clip)
        self.ptx_loss_fn = nn.CrossEntropyLoss(ignore_index=-100)
        self.ptx_coef = ptx_coef
        self.actor_optim = actor_optim
        self.critic_optim = critic_optim

    '''
     该方法根据经验(Experience)对象计算执行者(actor)和评论者(critic)的损失
     并使用策略进行反向传播和优化器更新
    '''
    def training_step(self, experience: Experience) -> Dict[str, float]:
        self.actor.train()
        self.critic.train()
        # policy loss
        num_actions = experience.action_mask.size(1)
        action_log_probs = self.actor(experience.sequences, num_actions, attention_mask=experience.attention_mask)
        actor_loss = self.actor_loss_fn(action_log_probs,
                                        experience.action_log_probs,
                                        experience.advantages,
                                        action_mask=experience.action_mask)

        # ptx loss
        if self.ptx_coef != 0:
            ptx = next(iter(self.pretrain_dataloader))['input_ids'].to(torch.cuda.current_device())
            label = next(iter(self.pretrain_dataloader))['labels'].to(torch.cuda.current_device())[:, 1:]
            attention_mask = next(iter(self.pretrain_dataloader))['attention_mask'].to(torch.cuda.current_device())
            ptx_log_probs = self.actor.get_base_model()(ptx, attention_mask=attention_mask)['logits'][..., :-1, :]
            ptx_loss = self.ptx_loss_fn(ptx_log_probs.view(-1, ptx_log_probs.size(-1)), label.view(-1))
            actor_loss = ptx_loss * self.ptx_coef + actor_loss * (1 - self.ptx_coef)

        self.strategy.backward(actor_loss, self.actor, self.actor_optim)
        self.strategy.optimizer_step(self.actor_optim)
        self.actor_optim.zero_grad()

        # value loss
        values = self.critic(experience.sequences,
                             action_mask=experience.action_mask,
                             attention_mask=experience.attention_mask)
        critic_loss = self.critic_loss_fn(values,
                                          experience.values,
                                          experience.reward,
                                          action_mask=experience.action_mask)
        self.strategy.backward(critic_loss, self.critic, self.critic_optim)
        self.strategy.optimizer_step(self.critic_optim)
        self.critic_optim.zero_grad()

        return {'reward': experience.reward.mean().item()}


def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> None:
    origin_model = strategy._unwrap_actor(actor)
    new_kwargs = {**generate_kwargs}
    # use huggingface models method directly
    if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'):
        new_kwargs['prepare_inputs_fn'] = origin_model.prepare_inputs_for_generation

    if 'update_model_kwargs_fn' not in generate_kwargs:
        new_kwargs['update_model_kwargs_fn'] = update_model_kwargs_fn

    return new_kwargs


def save_model(self, path: str, only_rank0: bool = False, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
    self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0, tokenizer=tokenizer)

在获得最终模型权重后,还可通过量化降低推理硬件成本,并启动在线推理服务,仅需单张约 4GB 显存的 GPU 即可完成 70 亿参数模型推理服务部署

更多推荐

类ChatGPT逐行代码解读(2/2):从零起步实现ChatLLaMA和ColossalChat

本文发布于:2023-04-29 03:08:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/69e643943dfc23457d8b1fb3c9e9f6df.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:代码   ChatGPT   ColossalChat   ChatLLaMA

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!

  • 110006文章数
  • 27961阅读数
  • 0评论数