要查看图文并茂的教程,请移步: http://studyai.com/pytorch-1.4/intermediate/reinforcement_q_learning.html
本教程演示如何使用PyTorch在 OpenAI Gym 的手推车连杆(CartPole-v0)任务 上训练深度Q-学习的智能体(Deep Q Learning(DQN)agent)。 任务(Task)
智能体(agent)必须在两个动作(action)之间做出决定——向左或向右移动手推车(cart)——这样连在手推车上的杆子(pole)就可以保持直立。 你可以在 Gym 网站 上找到一个包含各种算法和可视化的官方排行榜。
cartpole
上图显示了手推车连杆的运行画面(cartpole)
当智能体观察环境的当前状态(state)并选择一个动作时,环境将迁移(transitions)到一个新状态, 并返回一个表明该动作所造成的结果的奖励(reward)。在这项任务中,每增加一个时间步,奖励为+1, 如果杆子掉得太远或推车偏离中心超过2.4个单位距离,则环境终止。 这意味着表现更好的情景将持续更长的时间,积累更大的回报。
手推车连杆(CartPole)任务的设计使得对智能体的输入是4个表示环境状态(位置、速度等)的实数值(real values)。 然而,神经网络完全可以通过观察场景(looking at the scene)来解决任务,因此我们将使用从屏幕上扣下来的 以购物车为中心的图像块(image patch)作为输入。正因为如此,我们的结果无法直接与官方排行榜的结果相比 ——我们的任务要困难得多。不幸的是,这会减慢训练速度,因为我们必须渲染(render)所有帧。
严格地说,我们将把状态(state)表示为当前屏幕上扣取的图像块和上一个屏幕上扣取的图像块之间的差分(difference)。 这将允许智能体从一个图像中把连杆的速度也考虑进去。
依赖包(Packages)
首先,我们导入依赖包. 第一个依赖包是 gym ,用于产生手推车连杆环境(environment), 安装方式为(pip install gym)。其他的依赖包来自于PyTorch:
神经网络 (torch.nn)
优化 (torch.optim)
自动微分 (torch.autograd)
视觉任务工具集 (torchvision - a separate package).
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
env = gym.make('CartPole-v0').unwrapped
# 设置 matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
from IPython import display
plt.ion()
# 查看 GPU 是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
回放记忆/内存(Replay Memory)
我们将使用经验回放记忆(experience replay memory)来训练我们的DQN。 它存储智能体观察到的状态迁移(transitions),允许我们稍后重用这些数据。 通过对过往经验随机抽样,可以构造一个不相关的训练批次。结果表明,该方法大大稳定和改进了DQN训练过程。
为了实现上述功能, 我们需要定义两个类:
Transition - 一个命名元组(named tuple)用于表示环境中的单次状态迁移(single transition)。
该类的作用本质上是将状态-动作对[(state, action) pairs]映射到他们的下一个结果,即[(next_state, action) pairs], 其中的 状态(state)是指从屏幕上获得的差分图像块(screen diifference image)。
ReplayMemory - 一个大小有限的循环缓冲区,用于保存最近观察到的迁移(transition)。 该类还实现了一个采样方法 .sample() 用来在训练过程中随机的选择一个迁移批次(batch of transitions)。
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
"""Saves a transition."""
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
现在,让我们定义我们的模型。但首先,让我们快速回顾一下 DQN 是什么。
DQN 算法
我们的环境是确定性的,所以为了简单起见,这里给出的所有方程也都是确定性的。 在强化学习文献中,它们还包含对环境中随机迁移(stochastic transitions)的期望。
我们的目标是训练一个策略,该策略能使打折后的累计奖励最大化。 Rt0=∑∞t=t0γt−t0rt
, 其中 Rt0 也被称之为 回报(return). 折扣因子, γ, 应该是一个 0 到 1
之间的常量,以保证累计求和是可收敛的。 对我们的智能体来说,折扣因子使得来自遥远未来的奖励(far future rewards)不如即将到来的奖励(near future rewards)重要。 因为遥远未来的奖励的不确定性要大于即将到来的奖励的不确定性。
Q-learning 背后的思想是: 如果我们有一个函数 Q∗:State×Action→R
能够 告诉我们可以获得的回报是多少, 那么如果要在某个给定的状态上采取一个最优动作,只需要简单的构建一个能够使可获得的回报最大化的策略即可:
π∗(s)=argmaxa Q∗(s,a)
然而, 我们并不知道外部世界环境的所有完整信息,所以我们没有机会得到 Q∗
。 但是,由于神经网络是通用函数逼近器,我们可以简单地创建一个神经网络并训练它,使它与 Q∗
趋同。
对于我们的训练更新规则,我们将使用一个事实,即某个策略的每一个 Q
函数都遵循 贝尔曼方程:
Qπ(s,a)=r+γQπ(s′,π(s′))
等式两边的差称为时间差误差(temporal difference error),δ
:
δ=Q(s,a)−(r+γmaxaQ(s′,a))
为了最小化这个误差, 我们将使用的损失函数为: Huber loss. 当误差很小时,Huber损失的作用类似于均方误差;但当误差较大时,它的作用类似于平均绝对误差—— 这使得当 Q
的估计值带有非常大的噪声时,损失对异常值更加稳健鲁棒。 我们通过从回放记忆/缓存(replay memory)中采样的一个批次的迁移样本(transition samples) B
, 来计算Huber损失:
L=1|B|∑(s,a,s′,r) ∈ BL(δ)
whereL(δ)={12δ2|δ|−12for |δ|≤1,otherwise.
Q-网络(Q-network)
我们的模型将是一个卷积神经网络,它以当前屏幕图像块和以前屏幕图像块之间的差分作为输入。 它有两个输出,表示 Q(s,left)
和 Q(s,right) (其中 s
是网络的输入)。 实际上,该网络正试图预测在给定当前输入的情况下,采取每项行动的预期回报(expected return)。
class DQN(nn.Module):
def __init__(self, h, w, outputs):
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
self.bn1 = nn.BatchNorm2d(16)
self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
self.bn2 = nn.BatchNorm2d(32)
self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
self.bn3 = nn.BatchNorm2d(32)
# 线性层的输入连接数取决于conv2d层的输出以及输入图像的尺寸,
# 因此需要计算出来:linear_input_size
def conv2d_size_out(size, kernel_size = 5, stride = 2):
return (size - (kernel_size - 1) - 1) // stride + 1
convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
linear_input_size = convw * convh * 32
self.head = nn.Linear(linear_input_size, outputs)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
return self.head(x.view(x.size(0), -1))
输入抽取(Input extraction)
下面的代码是从环境中提取和处理渲染图像的. 它使用了 torchvision 包, 该包的使用使得 组合不同的图像变换变得很容易。 运行该cell后,它将显示提取的图像块(patch)。
resize = T.Compose([T.ToPILImage(),
T.Resize(40, interpolation=Image.CUBIC),
T.ToTensor()])
def get_cart_location(screen_width):
world_width = env.x_threshold * 2
scale = screen_width / world_width
return int(env.state[0] * scale + screen_width / 2.0) # MIDDLE OF CART
def get_screen():
# Returned screen requested by gym is 400x600x3, but is sometimes larger
# such as 800x1200x3. Transpose it into torch order (CHW).
screen = env.render(mode='rgb_array').transpose((2, 0, 1))
# Cart is in the lower half, so strip off the top and bottom of the screen
_, screen_height, screen_width = screen.shape
screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
view_width = int(screen_width * 0.6)
cart_location = get_cart_location(screen_width)
if cart_location < view_width // 2:
slice_range = slice(view_width)
elif cart_location > (screen_width - view_width // 2):
slice_range = slice(-view_width, None)
else:
slice_range = slice(cart_location - view_width // 2,
cart_location + view_width // 2)
# Strip off the edges, so that we have a square image centered on a cart
screen = screen[:, :, slice_range]
# Convert to float, rescale, convert to torch tensor
# (this doesn't require a copy)
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
screen = torch.from_numpy(screen)
# Resize, and add a batch dimension (BCHW)
return resize(screen).unsqueeze(0).to(device)
env.reset()
plt.figure()
plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(),
interpolation='none')
plt.title('Example extracted screen')
plt.show()
训练(Training)
超参数与辅助函数
下面的代码实现了我们的模型及其优化器,并且定义了一些实用工具函数:
select_action - 将根据epsilon贪婪策略选择动作。简单地说,我们有时会使用我们的模型来选择动作, 有时我们只是在所有可能的动作集合中均匀采样一个。 通过均匀采样随机选择一个动作的概率将从 EPS_START 开始,并呈指数衰减,朝 EPS_END 结束。 EPS_DECAY 控制着衰减速率。
plot_durations - 该辅助函数用来绘制每集剧情的持续时间,以及过去的最近100集(last 100 episodes)的平均持续时间(官方评估中使用的度量标准)。 绘图将在包含主训练循环的单元下面,并且将在每一集之后更新(update after every episode)。
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10
# 获取屏幕大小,以便我们可以根据Gym返回的形状(shape)正确初始化网络层。
# 此时的典型尺寸接近 3x40x90,这是 get_screen() 中压缩和缩小渲染缓冲区的结果
init_screen = get_screen()
_, _, screen_height, screen_width = init_screen.shape
# 从 Gym 的动作空间获得动作的数量
n_actions = env.action_space.n
policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
# t.max(1) will return largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
return policy_net(state).max(1)[1].view(1, 1)
else:
return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)
episode_durations = []
def plot_durations():
plt.figure(2)
plt.clf()
durations_t = torch.tensor(episode_durations, dtype=torch.float)
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# Take 100 episode averages and plot them too
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
if is_ipython:
display.clear_output(wait=True)
display.display(plt.gcf())
训练循环(Training loop)
最后,给出训练模型的代码.
下面, 函数 optimize_model 将执行一个单步优化。 它首先采样一个批次的样本(batch), 将所有张量连接成一个张量 并计算 Q(st,at)
和 V(st+1)=maxaQ(st+1,a), 然后将它们组合进我们的损失(loss)中. 根据定义,如果 s 是一个终止状态,则设定 V(s)=0 。 为了给算法增加稳定性,我们还使用一个目标网络(target network)来计算 V(st+1)
。 目标网络的权重大部分时间保持冻结状态,但每隔一段时间就会用策略网络(policy network)的权重更新一次。 更新间隔通常是若干优化步(optimizition steps),但为了简单起见,我们将使用剧集(episodes)为更新间隔单位。
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (看 https://stackoverflow.com/a/19343/3343043 详细解释
# ). 把 Transitions 的 batch-array 转换为 batch-arrays 的 Transition 。
batch = Transition(*zip(*transitions))
# 计算非最终状态的mask 并把批次样本串接(concantecate)起来
# (一个最终状态(final state)是指在该状态上(仿真)游戏就结束了)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# 计算 Q(s_t, a) - 模型计算 Q(s_t), 然后我们在动作列中选择动作
# 这些是根据策略网络(policy_net)对batch中每个状态所采取的操作
state_action_values = policy_net(state_batch).gather(1, action_batch)
# 计算所有下一个状态的 V(s_{t+1})
# 对非最终下一个状态的动作的期望值是基于“旧的”target_net进行计算的;
# selecting their best reward with max(1)[0].
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
# 计算期望 Q 值
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# 计算 Huber loss
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
# 优化模型
optimizer.zero_grad()
loss.backward()
for param in policy_net.parameters():
param.grad.data.clamp_(-1, 1)
optimizer.step()
下面,你可以找到主要的训练循环。在开始时,我们重置环境并初始化状态张量。 然后,我们采样一个动作,执行它,观察下一个屏幕和奖励(总是1),并优化我们的模型一次。 当一次episode结束时(我们的模型失败,game over),我们重新启动循环。
下面的代码中, num_episodes 设置的较小. 你应该下载notebook并运行更多的epsiodes, 比如300+以获得有意义的持续时间改进。
num_episodes = 50
for i_episode in range(num_episodes):
# 初始化环境与状态
env.reset()
last_screen = get_screen()
current_screen = get_screen()
state = current_screen - last_screen
for t in count():
# 选择并执行一个动作
action = select_action(state)
_, reward, done, _ = env.step(action.item())
reward = torch.tensor([reward], device=device)
# 观察一个新的状态
last_screen = current_screen
current_screen = get_screen()
if not done:
next_state = current_screen - last_screen
else:
next_state = None
# 将状态转移保存到记忆内存(memory)中
memory.push(state, action, next_state, reward)
# 移动到下一个状态
state = next_state
# 执行一步优化过程(on the target network)
optimize_model()
if done:
episode_durations.append(t + 1)
plot_durations()
break
# 更新目标网络, copying all weights and biases in DQN
if i_episode % TARGET_UPDATE == 0:
target_net.load_state_dict(policy_net.state_dict())
print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()
这是一个图表,说明了整个数据流是如何产生的。
../_images/reinforcement_learning_diagram.jpg
动作(Actions)可以随机选择,也可以基于策略选择, u接着从gym环境中获得下一步到达的状态. 我们将结果记录在回放记忆/内存(replay memory)中,并在每次迭代中运行优化步骤。 优化器从replay memory中随机选取一个批次的样本来进行新策略的训练。 “旧的” target_net 也会被用于优化中以计算期望的Q值;它被偶尔更新以保持其最新。