学习笔记(一)Q learning 附代码"/>
强化学习笔记(一)Q learning 附代码
Q learning是一个决策过程,通过不断地尝试,根据选择的行为而得到的“奖励”来为所选择的这个行为“打分”,不停迭代得到最优的选择。
例如,你现在在做作业,你有两条行为选择,一是继续做作业,完成后可以得到棒棒糖(奖励),二是不做作业,选择看电视,这样的结果是收到惩罚。
你是第一次经历这种事情,不知道两种行为的后果,于是随机选择,假如你选择看电视,选择后没有奖励,也没有惩罚,于是你继续看电视,后来父母回来发现你没做作业在看电视,于是你收到了惩罚,因此在你的心里,为在做作业时选择看电视打低分,因为你经历过,并且受到了惩罚。
在Q learning中,创建一个Q表,代表的是每一个状态下不同行为的“分数”(这个是你在不断尝试中根据奖励和惩罚逐渐形成的),算法学习中就是在不断更新这个Q表,当算法learning的比较完美时,那么你就可以每次根据Q表来选择你的下一步行为。如下,Q(s1,a2)>Q(s1,a1),于是你选择a2的行为,如此反复即可。
注意:Q表的值最开始一般为0,在你不断尝试中更新Q表,最终可以根据这个Q表快速得到“奖励”。
那么,Q表是如何更新的呢?
如下,假如根据之前的学习,Q表的s1,s2状态已经更新到了这样,如果目前状态为s1,根据比对,Q(s1,a2)>Q(s1,a1),意思选择a2更可能获得奖励,于是选择a2行为,此时,我们将Q(s1,a2)作为我们的预测分数(即预测打分),预估分数为1.
选择a2行为后,来到s2状态,此时我们实际不做选择,我们假象做一个选择,这个选择可以让我们在s2状态得到最大的奖励,即假设我们选择a2行为(因为Q(s2,a2)>Q(s2,a1)),这时,我们把从s1状态下选择a2行为来到s2状态的奖励R与在s2状态下选择a2的奖励乘衰减值γ的和作为实际分数,即0(此时还未完成作业而得到棒棒糖) + γ2 ,取γ=0.9 ,则实际分数为:1.8
此时我们更新的是Q(s1,a2)。
那么新的Q(s1,a2) = 1 (老Q(s1,a2)) + α(实际分数-预估分数),去α=0.1,则新Q(s1,a2) = 1.08
注意:在选择行为是也不是完全按照Q表来决策,有时会90%根据Q表,10%采用随机选择。
这是算法的伪代码,其中α是学习率,γ是衰减度,e-greedy是行为选择策略(90%Q表,10%随机)
假设你近视,γ=1代表你有合适的眼镜,γ=0代表没有眼镜,其他代表度数的匹配度,当γ=1时,你可以清清楚楚看到s1状态后的所有奖励,而在γ=0时,你只能看到最近状态的奖励,即状态s2状态的奖励。
代码详细讲解参见莫烦python教程
import numpy as np
import pandas as pd
import timenp.random.seed(2) # reproducibleN_STATES = 6 # the length of the 1 dimensional world
ACTIONS = ['left', 'right'] # available actions
EPSILON = 0.9 # greedy police
ALPHA = 0.1 # learning rate
GAMMA = 0.9 # discount factor
MAX_EPISODES = 13 # maximum episodes
FRESH_TIME = 0.3 # fresh time for one movedef build_q_table(n_states, actions):table = pd.DataFrame(np.zeros((n_states, len(actions))), # q_table initial valuescolumns=actions, # actions's name)# print(table) # show tablereturn tabledef choose_action(state, q_table):# This is how to choose an actionstate_actions = q_table.iloc[state, :]if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()): # act non-greedy or state-action have no valueaction_name = np.random.choice(ACTIONS)else: # act greedyaction_name = state_actions.idxmax() # replace argmax to idxmax as argmax means a different function in newer version of pandasreturn action_namedef get_env_feedback(S, A):# This is how agent will interact with the environmentif A == 'right': # move rightif S == N_STATES - 2: # terminateS_ = 'terminal'R = 1else:S_ = S + 1R = 0else: # move leftR = 0if S == 0:S_ = S # reach the wallelse:S_ = S - 1return S_, Rdef update_env(S, episode, step_counter):# This is how environment be updatedenv_list = ['-']*(N_STATES-1) + ['T'] # '---------T' our environmentif S == 'terminal':interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)print('\r{}'.format(interaction), end='')time.sleep(2)print('\r ', end='')else:env_list[S] = 'o'interaction = ''.join(env_list) #become stringprint('\r{}'.format(interaction), end='')time.sleep(FRESH_TIME)def rl():# main part of RL loopq_table = build_q_table(N_STATES, ACTIONS)for episode in range(MAX_EPISODES):step_counter = 0S = 0is_terminated = Falseupdate_env(S, episode, step_counter)while not is_terminated:A = choose_action(S, q_table)S_, R = get_env_feedback(S, A) # take action & get next state and rewardq_predict = q_table.loc[S, A]if S_ != 'terminal':q_table.iloc[S_, :].max()q_target = R + GAMMA * q_table.iloc[S_, :].max() # next state is not terminalelse:q_target = R # next state is terminalis_terminated = True # terminate this episodeq_table.loc[S, A] += ALPHA * (q_target - q_predict) # updateS = S_ # move to next stateprint(q_table)update_env(S, episode, step_counter+1)step_counter += 1return q_tableif __name__ == "__main__":q_table = rl()print('\r\nQ-table:\n')print(q_table)
这里只简单讲一下部分重要代码:
#选择一个行为A = choose_action(S, q_table)#根据这个行为,到达下一个状态,并返回到达该状态的奖励R,注意,此时还没实际到达,s代表实际状态,当s=s_时才是实际到达 S_, R = get_env_feedback(S, A)#根据Q表获得预估奖励,即Q(s,a)q_predict = q_table.loc[S, A]if S_ != 'terminal':#计算实际奖励,q_table.iloc[S_, :].max()是s_状态下期望的最优选择(假设中的,并未实际选择)q_target = R + GAMMA * q_table.iloc[S_, :].max() # next state is not terminalelse:q_target = R # next state is terminalis_terminated = True # terminate this episode#更新Q表,S状态下的,即当前状态q_table.loc[S, A] += ALPHA * (q_target - q_predict) # update#此时才真正的走到下一个状态S = S_ # move to next state
注:Q表里的值只是潜在奖励,相当于可能获得奖励的几率,而不直接是奖励值
上面的例子是线性寻找,那在二维坐标系中该如何寻找呢?
点击获取完整代码
在这样的迷宫里,红点需要找到黄点,且不能走到黑点,即在黄点有奖励,在黑点有惩罚。
在二维迷宫里寻找目标,原理和前面线性寻找一样,只不过每个状态的记录方式有变化,前面线性的状态可以直接用Q表的行序号代表,而二维迷宫里,每个状态需要用小正方形在二维坐标系的坐标来计算。
每次可选择的行为有4钟,上下左右(分别用0123表示),每次选择行为函数:
def choose_action(self, observation):self.check_state_exist(observation)# action selection#epsilon = e_greedy=0.9,即选择策略,if np.random.uniform() < self.epsilon:# choose best actionstate_action = self.q_table.loc[observation, :]# some actions may have the same value, randomly choose on in these actionsaction = np.random.choice(state_action[state_action == np.max(state_action)].index)else:# choose random actionaction = np.random.choice(self.actions)return action
选择了行为后,则需要计算该行为可获得的奖励以及下一个状态
def step(self, action):#计算正方形坐标位移的值s = self.canvas.coords(self.rect)base_action = np.array([0, 0])if action == 0: # upif s[1] > UNIT:base_action[1] -= UNITelif action == 1: # downif s[1] < (MAZE_H - 1) * UNIT:base_action[1] += UNITelif action == 2: # rightif s[0] < (MAZE_W - 1) * UNIT:base_action[0] += UNITelif action == 3: # leftif s[0] > UNIT:base_action[0] -= UNIT# 移动矩阵self.canvas.move(self.rect, base_action[0], base_action[1]) # 移动后的坐标作为下一个状态s_ = self.canvas.coords(self.rect) # 计算奖励,done代表是否找到终点或“黑洞”if s_ == self.canvas.coords(self.oval):reward = 1done = Trues_ = 'terminal'elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]:reward = -1done = Trues_ = 'terminal'else:reward = 0done = Falsereturn s_, reward, done
之后根据上面计算结果更新当前状态的Q表。
def learn(self, s, a, r, s_):self.check_state_exist(s_)#预测值,即当前状态s下a行为的Q表值q_predict = self.q_table.loc[s, a]if s_ != 'terminal':#实际值,到达s_状态的奖励r + γ*s_状态下最优选择的Q表值q_target = r + self.gamma * self.q_table.loc[s_, :].max() # next state is not terminalelse:q_target = r # next state is terminal#更新Q表self.q_table.loc[s, a] += self.lr * (q_target - q_predict) # update
在上面的代码第二行中有句
self.check_state_exist(s_)
前面线性寻找中,Q表是最开始就直接初始化好了的,在迷宫中,假如我们最开始不知道迷宫有多大,那么初始化时Q表时不能写死,只能定义一个空表
self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64)
在每次尝试中,当到达一个新状态时,首先需要在Q表中寻找是否有该状态,即函数check_state_exist,如果该妆态在Q表中不存在,则需要添加该状态
def check_state_exist(self, state):if state not in self.q_table.index:# 添加新状态,值均为0self.q_table = self.q_table.append(pd.Series([0]*len(self.actions),index=self.q_table.columns,name=state,))
更新Q表的主循环如下:
def update():for episode in range(100):# 获取初始状态,即最左上角的位置坐标observation = env.reset()while True:# 刷新视图env.render()# 选择下一个行为action = RL.choose_action(str(observation))# 计算下一个状态,奖励以及是否结束该次寻找observation_, reward, done = env.step(action)# 学习过程,即更新Q表RL.learn(str(observation), action, reward, str(observation_))# swap observationobservation = observation_# break while loop when end of this episodeif done:break
更多推荐
强化学习笔记(一)Q learning 附代码
发布评论