DQN-2

DQN的改进算法

Double DQN

​ 普通的 DQN 算法通常会导致对Q 值的过高估计(overestimation)。原因在与 Q 网络的自举(bootstrapping):

​ 传统 DQN 优化的 TD 误差目标为:

r+γmaxaAQw(s,a)r+\gamma \max_{a'\in A}Q_{w^-}(s',a')

其中,maxaQw(s,a)\max_{a'}Q_{w-}(s',a') 由参数为 ww^- 的目标网络计算得出,其中 max\max 操作可以视为由两部分构成:

  • 选取状态 ss' 下的最优动作 a=argmaxaQa^*=\arg \max_{a'}Q
  • 再计算该动作对应的价值 Qw(s,a)Q_{w^-}(s', a')

​ 当这两部分共同采用同一套 Q 网络的时候,每次得到的都是神经网络中估算所有动作状态的最大值,考虑神经网络会不断累计正误差,DQN 的过高估计问题会非常严重,本质上的原因在于:

  • 探索不足:探索不足导致产生的样本无法反应出概率分布: π:p(s,rs,a)\pi:p\left( s',r {\mid} s,a \right)
  1. 值函数存在方差: Q(s,a)Q(s,a) 的更新存在方差,不可能一步就更新到目标值,它实际的形式应该是围绕目标值上下波动,但是波动的幅度越来越小

  2. 贪心思想将高估的值进行了传递:训练过程中,实际的 Q(s,a)Q\left( s',a' \right) 其实是一个估计值,它和最优的 Q(s,a)Q^{*}\left( s',a' \right) 存在误差,存在一些动作对应的值被高估,而贪心思想又在更新 Q(s,a)Q(s,a) 时使用了下一个状态 ss' 中最大的 maxaQ(s,a)\max_{a'}Q\left( s',a' \right) ,恰好会使得那些被高估的动作的再更新时不断反向传播到之前的状态

​ 为了解决这一问题,Double DQN 算法提出利用两套独立训练的神经网络来估算 maxa‘’Q(s,a)\max\limits_{ a‘’}Q^*\left(s', a' \right),具体做法是利用 一套神经网络 QωQ_{\omega} 的输出来选取价值最大的动作,但在使用该动作的价值时,用另一套神经网 络 QarQ_{ar} 来计算该动作的价值。这样即使其中一套神经网络的某个动作存在比较严重的过高估计问题,由于另一套神经网络的存在,这个动作最终使用的 QQ 值也不会存在很大的过高估计

优化目标:

​ 我们设选取动作网络参数为 ww(训练网络),预测 Q 值网络参数为 ww^-(目标网络),则通过之前的分析,我们的优化目标为:

r+γQw(s,argmaxaQw(s,a))r+\gamma Q_{w^-}(s', \arg\max_{a'}Q_{w}(s', a'))

Dueling DQN

​ Dueling DQN是 DQN的另一种改进算法,它在传统 DQN 的基础上只进行了微小的改动,但能大幅提升 DQN的表现

​ 它从另外一个方面考虑 DQN 的问题:每次更新 Q 网络的时候,我们只是对网络中的最大值进行反向传播,而不管其他的 Q 值,这导致不同动作的 Q 值可能相差过大:

普通DQN

Dueling-DQN

因此在 Dueling DQN 中,引入优势函数的概念:

​ 将动作价值函数 QQ 减去状态价值函数 VV 的结果定义为优势函数 AA,即 A(s,a)=Q(s,a)V(s)A(s,a)=Q(s,a)-V(s),则在同一状态下,所有动作的优势值之和为 0(因为所有动作价值的期望就是这个状态的状态价值)

​ 因此 Q 网络被建模为:

Qη,α,β(s,a)=Vη,α(s)+Aη,β(s,a)Q_{\eta, \alpha, \beta}(s,a)=V_{\eta, \alpha}(s)+A_{\eta, \beta}(s,a)

DQN与dueling-DQN网络结构差异

Dueling DQN 的问题:

​ 对于公式 Qη,α,β(s,a)=Vη,α(s)+Aη,β(s,a)Q_{\eta,\alpha,\beta}\left( s,a\right) = V_{\eta,\alpha}\left( s \right) + A_{\eta,\beta}\left( s,a\right),它存在对于 VV 值和 AA 值建模不唯一性的问题。例如,对于同样的 QQ 值,如果将 VV 值加上任意大小的常数 CC ,再将所有 AA 值减去 CC ,则得到的 QQ 值依然不变,这就导致了训练的不稳定性。为了解决这一问题,Dueling DQN 强制最优动作的优势函数的实际输出为 0 , 即:

Q_{\eta,\alpha,\beta}\left(s,a \right) = V_{\eta,\alpha}\left( {s} \right) + A_{\eta,\beta}\left( {s}, {a} \right) {-} {\max}\limits_{ {d}}A_{\eta,\beta}\left( {s}, a' \right)

此时 V(s)=maxaQ(s,a)V(s) = \max_aQ(s,a) ,可以确保 VV 值建模的唯一性。在实现过程中,我们还可以用平均操作代替最大化操作,即:

Qη,α,β(s,a)=Vη,α(s)+Aη,β(s,a)1AdAη,β(s,a)Q_{\eta,\alpha,\beta}\left(s,a\right) = V_{\eta,\alpha}\left( s\right) + A_{\eta,\beta}\left(s, a \right) - \frac{1}{|A|}\sum_dA_{\eta,\beta}\left(s,a' \right)

​ 此时 V(s)=1AdQ(s,a)V(s) = \frac{1}{|A|}\sum_d Q( s,a'),虽然它不再满 足贝尔曼最优方程,但实际应用时更加稳定

两个方法的本质

​ 无论是 Double DQN 还是 Dueling DQN,他们都是通过优化神经网络结构的方式来改进 DQN 中 QQ 值的预测问题,无论是 Double DQN 改进过高的 QQ 值预测,还是 Dueling DQN 改进最优 QQ 值的预测,都是通过优化神经网络结构来更好地在小样本状态下进行预测

代码实现:

一些工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class RelayBuffer(object):
def __init__(self, max_length):
self.buffer = collections.deque(max_length=max_length)

def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size):
return random.sample(self.buffer, batch_size)

def size(self):
return len(self.buffer)

class Qnet(nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super().__init__()
self.linear1 = nn.Linear(state_dim, hidden_dim)
self.activation = nn.ReLU()
self.linear2 = nn.Linear(hidden_dim, action_dim)

def forward(self, x):
x = self.linear1(x)
x = self.activation(x)
x = self.linear2(x)
return x

class VAnet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(VAnet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim) # 共享网络部分
self.fc_A = torch.nn.Linear(hidden_dim, action_dim)
self.fc_V = torch.nn.Linear(hidden_dim, 1)

def forward(self, x):
x = self.fc1(x)
A = self.fc_A(F.relu(self.fc1(x)))
V = self.fc_V(F.relu(self.fc1(x)))
Q = V + A - torch.mean(A, dim=1, keepdim=True)
return Q

实现与训练代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class DQN(object):
def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma,
epsilon, target_update, type='ValillaDQN'):

# 配置全连接层的参数
self.action_dim = action_dim
self.hidden_dim = hidden_dim
self.state_dim = state_dim


if type == 'DuelingDQN':
self.q_net = VAnet(state_dim, hidden_dim, action_dim)
self.target_q_net = VAnet(state_dim, hidden_dim, action_dim)

elif type == 'ValillaDQN' or 'DoubleDQN':
self.q_net = Qnet(state_dim, hidden_dim, action_dim)
self.target_q_net = Qnet(state_dim, hidden_dim, action_dim)

else:
assert False, 'DQN type error'

self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=learning_rate)

self.learning_rate = learning_rate # 更新时候的alpha值
self.gamma = gamma # 折扣因子
self.epsilon = epsilon # epsilon-greedy策略
self.target_update = target_update # 目标网络更新频率
self.count = 0 # 记录更新次数,用于target网络延迟更新
self.type = type

def take_action(self, state):
""" 实现epsilon-greedy策略 """
if np.random.random() < self.epsilon:
action = np.random.randint(self.action_dim)
else:
state = torch.tensor(state, dtype=torch.float, device=device)
action = self.q_net(state).argmax().item()

return action

def max_q_value(self, state):
state = torch.tensor([state], dtype=torch.float, device=device)
return self.q_net(state).max().item()
def update(self, transition):
""" train 函数中的内部部分 """
# state, action, reward, next_state, done = zip(*transition)
state, action, reward, next_state, done = transition
states = torch.tensor(state, dtype=torch.float, device=device)
actions = torch.tensor(action, dtype=torch.int64, device=device).reshape(-1, 1)
rewards = torch.tensor(reward, dtype=torch.float, device=device).reshape(-1, 1)
next_states = torch.tensor(next_state, dtype=torch.float, device=device)
dones = torch.tensor(done, dtype=torch.float, device=device).reshape(-1, 1)

q_values = self.q_net(states).gather(1, actions)
if self.type == 'DuelingDQN' or 'ValillaDQN':
max_next_q_values, _ = torch.max(self.q_net(states), dim=1, keepdim=True)
elif self.type == 'DoubleDQN':
max_action = torch.argmax(self.q_net(next_states), dim=1, keepdim=True)
max_next_q_values = self.target_q_net(next_state).gather(1, max_action)


q_target = rewards + self.gamma * max_next_q_values * (1 - dones)
dqn_loss = torch.mean(F.mse_loss(q_values, q_target))

self.optimizer.zero_grad()
dqn_loss.backward()
self.optimizer.step()

if self.count % self.target_update == 0:
self.target_q_net.load_state_dict(self.q_net.state_dict())
self.count = 0
self.count += 1

lr = 1e-2
num_episodes = 200
hidden_dim = 128
gamma = 0.98
epsilon = 0.01
target_update = 50
buffer_size = 5000
minimal_size = 1000
batch_size = 64

env_name = 'Pendulum-v1'
env = gym.make(env_name)
state_dim = env.observation_space.shape[0]
action_dim = 11 # 将连续动作分成11个离散动作

def dis_to_con(discrete_action, env, action_dim): # 离散动作转回连续的函数
action_lowbound = env.action_space.low[0] # 连续动作的最小值
action_upbound = env.action_space.high[0] # 连续动作的最大值
return action_lowbound + (discrete_action / (action_dim - 1) * (action_upbound - action_lowbound))

def train_DQN(agent, env, num_episodes, replay_buffer, minimal_size, batch_size):
return_list = []
max_q_value_list = []
max_q_value = 0
for i in range(10):
with tqdm(total=int(num_episodes / 10),
desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes / 10)):
episode_return = 0
state, _ = env.reset()
done = False
while not done:
action = agent.take_action(state)
max_q_value = agent.max_q_value(state) * 0.005 + max_q_value * 0.995 # 平滑处理
max_q_value_list.append(max_q_value) # 保存每个状态的最大Q值
action_continuous = dis_to_con(action, env, agent.action_dim)
next_state, reward, done, _, _ = env.step([action_continuous])
replay_buffer.add(state, action, reward, next_state, done)
state = next_state
episode_return += reward
if replay_buffer.size() > minimal_size:
transition = replay_buffer.sample(batch_size)
agent.update(transition)
return_list.append(episode_return)
if (i_episode + 1) % 10 == 0:
pbar.set_postfix({
'episode':
'%d' % (num_episodes / 10 * i + i_episode + 1),
'return':
'%.3f' % np.mean(return_list[-10:])
})
pbar.update(1)
return return_list, max_q_value_list