强化学习 原理与Python实现(三)

编程入门 行业动态 更新时间:2024-10-11 17:25:29

强化学习 <a href=https://www.elefans.com/category/jswz/34/1770123.html style=原理与Python实现(三)"/>

强化学习 原理与Python实现(三)

冰面滑行 FrozenLake-v0

import numpy as np
np.random.seed(0)
import gym

 环境使用

env = gym.make('FrozenLake-v0')
env.seed(0)
print('观察空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space))
print('观测空间大小 = {}'.format(env.unwrapped.nS))
print('动作空间大小 = {}'.format(env.unwrapped.nA))
env.unwrapped.P[14][2] # 查看动力
观察空间 = Discrete(16)
动作空间 = Discrete(4)
观测空间大小 = 16
动作空间大小 = 4

 

[(0.3333333333333333, 14, 0.0, False),(0.3333333333333333, 15, 1.0, True),(0.3333333333333333, 10, 0.0, False)]

用随机策略玩

def play_policy(env, policy, render=False):total_reward = 0.observation = env.reset()while True:if render:env.render() # 此行可显示action = np.random.choice(env.action_space.n,p=policy[observation])observation, reward, done, _ = env.step(action)total_reward += reward  # 统计回合奖励if done: # 游戏结束breakreturn total_reward

 

# 随机策略
random_policy = \np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nAepisode_rewards = [play_policy(env, random_policy)  for _ in range(100)]
print("随机策略 平均奖励:{}".format(np.mean(episode_rewards)))
随机策略 平均奖励:0.04

 策略评估

def v2q(env, v, s=None, gamma=1.): # 根据状态价值函数计算动作价值函数if s is not None: # 针对单个状态求解q = np.zeros(env.unwrapped.nA)for a in range(env.unwrapped.nA):for prob, next_state, reward, done in env.unwrapped.P[s][a]:q[a] += prob * \(reward + gamma * v[next_state] * (1. - done))else: # 针对所有状态求解q = np.zeros((env.unwrapped.nS, env.unwrapped.nA))for s in range(env.unwrapped.nS):q[s] = v2q(env, v, s, gamma)return qdef evaluate_policy(env, policy, gamma=1., tolerant=1e-6):v = np.zeros(env.unwrapped.nS) # 初始化状态价值函数while True: # 循环delta = 0for s in range(env.unwrapped.nS):vs = sum(policy[s] * v2q(env, v, s, gamma)) # 更新状态价值函数delta = max(delta, abs(v[s]-vs)) # 更新最大误差v[s] = vs # 更新状态价值函数if delta < tolerant: # 查看是否满足迭代条件breakreturn v

 评估随机策略的价值函数

print('状态价值函数:')
v_random = evaluate_policy(env, random_policy)
print(v_random.reshape(4, 4))print('动作价值函数:')
q_random = v2q(env, v_random)
print(q_random)

 

状态价值函数:
[[0.0139372  0.01162942 0.02095187 0.01047569][0.01624741 0.         0.04075119 0.        ][0.03480561 0.08816967 0.14205297 0.        ][0.         0.17582021 0.43929104 0.        ]]
动作价值函数:
[[0.01470727 0.01393801 0.01393801 0.01316794][0.00852221 0.01162969 0.01086043 0.01550616][0.02444416 0.0209521  0.02405958 0.01435233][0.01047585 0.01047585 0.00698379 0.01396775][0.02166341 0.01701767 0.0162476  0.01006154][0.         0.         0.         0.        ][0.05433495 0.04735099 0.05433495 0.00698396][0.         0.         0.         0.        ][0.01701767 0.04099176 0.03480569 0.04640756][0.0702086  0.11755959 0.10595772 0.05895286][0.18940397 0.17582024 0.16001408 0.04297362][0.         0.         0.         0.        ][0.         0.         0.         0.        ][0.08799662 0.20503708 0.23442697 0.17582024][0.25238807 0.53837042 0.52711467 0.43929106][0.         0.         0.         0.        ]]

 策略改进

def improve_policy(env, v, policy, gamma=1.):optimal = Truefor s in range(env.unwrapped.nS):q = v2q(env, v, s, gamma)a = np.argmax(q)if policy[s][a] != 1.:optimal = Falsepolicy[s] = 0.policy[s][a] = 1.return optimal

 对随机策略进行改进

policy = random_policy.copy()
optimal = improve_policy(env, v_random, policy)
if optimal:print('无更新,最优策略为:')
else:print('有更新,更新后的策略为:')
print(policy)

 

有更新,更新后的策略为:
[[1. 0. 0. 0.][0. 0. 0. 1.][1. 0. 0. 0.][0. 0. 0. 1.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][0. 0. 0. 1.][0. 1. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][1. 0. 0. 0.][0. 0. 1. 0.][0. 1. 0. 0.][1. 0. 0. 0.]]

 策略迭代

def iterate_policy(env, gamma=1., tolerant=1e-6):# 初始化为任意一个策略policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) \/ env.unwrapped.nAwhile True:v = evaluate_policy(env, policy, gamma, tolerant) # 策略评估if improve_policy(env, v, policy): # 策略改进breakreturn policy, vpolicy_pi, v_pi = iterate_policy(env)
print('状态价值函数 =')
print(v_pi.reshape(4, 4))
print('最优策略 =')
print(np.argmax(policy_pi, axis=1).reshape(4, 4))

 

状态价值函数 =
[[0.82351246 0.82350689 0.82350303 0.82350106][0.82351416 0.         0.5294002  0.        ][0.82351683 0.82352026 0.76469786 0.        ][0.         0.88234658 0.94117323 0.        ]]
最优策略 =
[[0 3 3 3][0 0 0 0][3 1 0 0][0 2 1 0]]

 测试策略

episode_rewards = [play_policy(env, policy_pi)  for _ in range(100)]
print("策略迭代 平均奖励:{}".format(np.mean(episode_rewards)))

 

策略迭代 平均奖励:0.77

 价值迭代

def iterate_value(env, gamma=1, tolerant=1e-6):v = np.zeros(env.unwrapped.nS) # 初始化while True:delta = 0for s in range(env.unwrapped.nS):vmax = max(v2q(env, v, s, gamma)) # 更新价值函数delta = max(delta, abs(v[s]-vmax))v[s] = vmaxif delta < tolerant: # 满足迭代需求breakpolicy = np.zeros((env.unwrapped.nS, env.unwrapped.nA)) # 计算最优策略for s in range(env.unwrapped.nS):a = np.argmax(v2q(env, v, s, gamma))policy[s][a] = 1.return policy, vpolicy_vi, v_vi = iterate_value(env)
print('状态价值函数 =')
print(v_vi.reshape(4, 4))
print('最优策略 =')
print(np.argmax(policy_vi, axis=1).reshape(4, 4))

 

状态价值函数 =
[[0.82351232 0.82350671 0.82350281 0.82350083][0.82351404 0.         0.52940011 0.        ][0.82351673 0.82352018 0.76469779 0.        ][0.         0.88234653 0.94117321 0.        ]]
最优策略 =
[[0 3 3 3][0 0 0 0][3 1 0 0][0 2 1 0]]

 测试策略

episode_rewards = [play_policy(env, policy_vi) for _ in range(100)]
print("价值迭代 平均奖励:{}".format(np.mean(episode_rewards)))

 

价值迭代 平均奖励:0.7

更多推荐

强化学习 原理与Python实现(三)

本文发布于:2024-03-09 05:39:18,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1724013.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:原理   Python

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!