DQN-1

DQN-1

​ Q-learning算法中,我们以矩阵的方式建立了一张存储每个状态下所有动作 的 Q 值的表格。表格中的每一个动作价值 Q(s,a)Q(s, a) 表示在状态s 下选择动作 aa 然后继续遵循某一策 略预期得到的期望回报。然而,这种用表格存储动作价值的做法只在环境的状态和动作都是离散 的,并且空间都比较小的情况下适用

​ 对于这种情况,我们需要用函数拟合的方法来估计 Q 值,即将这个复杂的 Q 值表格视作数据,使用一个参数化的函数 Q 来拟合这些数据,DQN 就是通过神经网络来拟合函数

对神经网络的分析:

  • 一种方法是:神经网络的输入是状态 ss 和动作 aa,然后输出一个标量,表示在状态 ss 下 采取动作 aa 能获得的价值
  • 若动作是离散(有限)的,还可以只将状态 ss 输入到神经网络中,使其同时输出每一 个动作的 QQ 值。通常 DQN(以及 Q-learning)只能处理动作离散的情况,因为在函数 Q 的更新过 程中有 maxamax_a 这一操作

​ 那么就有一个问题,神经网络的损失函数是什么呢?Q-learning 的更新规则为:

Q(s,a)Q(s,a)+α[r+γmaxaAQ(s,a)Q(s,a)]Q(s,a)\leftarrow Q(s,a)+\alpha[r+\gamma \max_{a'\in A}Q(s',a')-Q(s,a)]

其中 $r+\gamma \max_{a’\in A}Q(s’, a’)-Q(s,a) $ 称为TD误差(时间差分误差),但是上面这个更新在 DQN 中不复存在,更新变为对神经网络参数的更新,那我们应该如何设计 loss 函数呢?

上述公式的目的为用 r+γmaxaAQ(s,a)r+\gamma \max_{a'\in A}Q(s',a') 来近似 Q(s,a)Q(s,a),我们于是我们把这个拟合的均方误差认为是神经网络的误差:

loss=12Ni=1N[Qw(s,a)(r+γmaxaAQw(s,a))]2loss = \frac{1}{2N}\sum_{i=1}^N\left[Q_w(s,a)-(r+\gamma \max_{a' \in A} Q_w(s', a')) \right]^2

经验回放:

​ 在一般的有监督学习中,假设训练数据是独立同分布的,每一个训练数 据会被使用多次在原来的 Q-learning 算法中,每一个数据只会用来更新一次Q 值。同样地 DQN 算法采用了经验回放(experience replay)方法,具体做法为维护一个回放缓冲区,将每次从环境中采样得到的四元组数据(状态、动作、奖励、 下一状态)存储到回放缓冲区中,训练 Q 网络的时候再从回放缓冲区中随机采样若干数据来进行训练

  • 使样本满足独立假设。在MDP 中交互采样得到的数据本身不满足独立假设,因为这一时刻的状态和上一时刻的状态有关。非独立同分布的数据对训练神经网络有很大的影响,会使神经网络拟合到最近训练的数据上。采用经验回放可以一定程度上打破样本之间的相关性
  • 提高样本效率。每一个样本可以被使用多次

目标网络:

​ 由于DQN的环境是会实时改变的,在更新网络参数的同时目标也在不断地改变,这非常容易造成神经网络训练的不稳定性。为了解决这一问题,DQN 便使用了目标网络( target network)的思想: 既然在训练过程中 Q 网络的不断更新会导致目标不断发生改变,不如暂时先将 TD 误差目标中 的 Q 网络固定住。为了实现这一思想,我们需要利用两套 Q 网络:

  1. 原来的(训练网络)用于生成 Qw(s,a)Q_w(s,a) 的网络,继续用损失函数 12Ni=1N[Qw(s,a)(r+γmaxaAQw(s,a))]2\frac{1}{2N}\sum_{i=1}^N\left[Q_w(s,a)-(r+\gamma \max_{a' \in A} Q_{w'}(s', a')) \right]^2 进行梯度下降更新
  2. 新的网络(目标网络)用于生成 Qw(s,a)Q_{w'}(s,a),原网络每更新 CC 次,目标网络才与训练网络同步一次,即 www'\leftarrow w这样做使目标网络相对于训练网络更加稳定

综上所述, DQN 算法的具体流程如下:

用随机的网络参数 ω\omega 初始化网络 Qn(s,a)Q_{n}\left( s,a \right)

复制相同的参数 ωω\omega {\leftarrow} \omega 来初始化目标网络 QωQ_{\omega} ,

初始化经验回放池 RR

​ for 序列 e=1Ee = 1 {\rightarrow} E do

​ 获取环境初始状态 s1s_{1}

​ for 时间步 t=1Tt = 1 {\rightarrow} T do

​ 根据当前网络 Qp(s,a)Q_p\left( s,a \right) ,以 ϵ{\epsilon}_- 贪婪策略选择动作 ata_{t}

​ 执行动作 ata_{t} ,获得回报 rtr_{t} ,环境状态变为 St+1S_{t + 1}

​ 将 (st,at,rt,st+1)\left( s_{t},a_{t},r_{t},s_{t + 1} \right) 存储在回放池 RR

​ 若 RR 中数据足够,从 RR 中采样 NN 个数据 {(si,ai,si+1)}i=1,,N\left\{\left( s_{i},a_{i},s_{i + 1} \right) \right\}_{i = 1,{\cdots},N}

​ 对每个数据,用目标网络计算 yi=ri+γmaxaQω(si+1,a)y_i = r_i + \gamma\max_a Q_{\omega^-}\left( s_{i + 1},a \right)

​ 最小化目标损失 L = \frac{1}{N}{\sum}\limits_i{\left( y_{i} {-} Q_{\omega}\left( {s}_{i},{a}_{i} \right) \right)}^{2} ,以此更新当前网络 QωQ_{\omega}

​ 更新目标网络

​ end for

end for

应用代码实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import random
import gymnasium as gym
import numpy as np
import collections
from tqdm import tqdm
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

class ReplayBuffer(object):
''' 经验回放池 '''
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity) # 队列,先进先出

def add(self, state, action, reward, next_state, done): # 将数据加入buffer
self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size): # 从buffer中采样数据,数量为batch_size
# 用于从指定的序列(列表、元组、字符串等)中随机无放回地抽取指定数量的元素
transitions = random.sample(self.buffer, batch_size)
# zip(*):将列表中的每个元组拆开,然后将相同位置的元素配对在一起
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done

def size(self): # 目前buffer中数据的数量
return len(self.buffer)

class Qnet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(Qnet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

def forward(self, x):
x = F.relu(self.fc1(x)) # 隐藏层使用ReLU激活函数
return self.fc2(x)

class DQN(object):
''' DQN算法 '''
def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma,
epsilon, target_update, device):
self.action_dim = action_dim

self.q_net = Qnet(state_dim, hidden_dim, self.action_dim).to(device) # Q网络

# 目标网络
self.target_q_net = Qnet(state_dim, hidden_dim, self.action_dim).to(device)
self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=learning_rate)
self.gamma = gamma # 折扣因子
self.epsilon = epsilon # epsilon-贪婪策略
self.target_update = target_update # 目标网络更新频率
self.count = 0 # 计数器,记录更新次数
self.device = device

def take_action(self, state): # epsilon-贪婪策略采取动作
if np.random.random() < self.epsilon:
action = np.random.randint(self.action_dim)
else:
state = torch.tensor([state], dtype=torch.float).to(self.device)
action = self.q_net(state).argmax().item()
return action

def update(self, transition_dict):
states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)

q_values = self.q_net(states).gather(1, actions) # Q值
# 下个状态的最大Q值
max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1)

# 计算目标Q值,当一个episode结束时(dones == True),未来的奖励应当被忽略,因此这部分乘以0。
# 相反,如果游戏还在继续(dones == False),则乘以1,保留对未来奖励的考虑
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones) # TD误差目标
dqn_loss = torch.mean(F.mse_loss(q_values, q_targets)) # 均方误差损失函数

self.optimizer.zero_grad()
dqn_loss.backward()
self.optimizer.step()

if self.count % self.target_update == 0:
self.target_q_net.load_state_dict(self.q_net.state_dict()) # 更新目标网络
self.count += 1


lr = 2e-3
num_episodes = 500
hidden_dim = 128
gamma = 0.98
epsilon = 0.01
target_update = 10
buffer_size = 10000
minimal_size = 500
batch_size = 64
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = 'CartPole-v1'
env = gym.make(env_name)
random.seed(0)
np.random.seed(0)
# env.seed(0)
torch.manual_seed(0)
replay_buffer = ReplayBuffer(buffer_size)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = DQN(state_dim, hidden_dim, action_dim, lr, gamma, epsilon,
target_update, device)

return_list = []
for i in range(10):
with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes / 10)):
episode_return = 0
state = env.reset()[0]
done = False
while not done:
action = agent.take_action(state)
next_state, reward, done, _, _ = env.step(action)
replay_buffer.add(state, action, reward, next_state, done)
state = next_state
episode_return += reward

# 当buffer数据的数量超过一定值后,才进行Q网络训练
if replay_buffer.size() > minimal_size:
b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
transition_dict = {
'states': b_s,
'actions': b_a,
'next_states': b_ns,
'rewards': b_r,
'dones': b_d
}
agent.update(transition_dict)

return_list.append(episode_return)
if (i_episode + 1) % 10 == 0:
pbar.set_postfix({
'episode':
'%d' % (num_episodes / 10 * i + i_episode + 1),
'return':
'%.3f' % np.mean(return_list[-10:])
})
pbar.update(1)

episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DQN on {}'.format(env_name))
plt.show()