-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdouble_dqn.py
352 lines (311 loc) · 13.9 KB
/
double_dqn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
"""Script que implementa la clase de agentes DDQN y permite entrenar y evaluar un agente.
Define la clase DoubleDQNAgent, una implementación de un agente Double DQN.
Al ejecutarse define, entrena y evalúa un agente DDQN en el entorno LunarLander-v2.
Fuente del código base utilizado: https://github.com/jcasasr/Aprendizaje-por-refuerzo/blob/main/M09
"""
import os
import random
import time
from copy import deepcopy
import gym
import numpy as np
import torch
from dqn import DQN
from playing import play_games_using_agent
from replay_buffer import ExperienceReplayBuffer
from utils import render_agent_episode, plot_rewards, plot_losses, plot_evaluation_rewards, save_agent_gif
class DoubleDQNAgent:
"""
Agente Double DQN.
"""
def __init__(self, env: gym.Env,
dnnetwork, buffer,
epsilon: float = 0.1,
eps_decay: float = 0.99,
min_epsilon: float = 0.01,
batch_size: int = 32):
"""
Inicializa el agente DDQN.
Args:
env: entorno gym
dnnetwork: red neuronal principal a entrenar
buffer: buffer de repetición a utilizar
epsilon: epsilon inicial
eps_decay: decaimiento de epsilon
min_epsilon: epsilon mínimo de entrenamiento
batch_size: tamaño del batch de entrenamiento
"""
self.env = env # Entorno
self.dnnetwork = dnnetwork # Red principal
self.target_network = deepcopy(dnnetwork) # red objetivo (copia de la principal)
self.buffer = buffer
self.epsilon = epsilon
self.eps_decay = eps_decay
self.min_epsilon = min_epsilon
self.batch_size = batch_size
self.nblock = 100 # bloque de los X últimos episodios de los que se calculará la media de recompensa
self.reward_threshold = self.env.spec.reward_threshold # recompensa media a partir de la cual se considera
# que el agente ha aprendido a jugar
# Otras variables inicializadas:
self.update_loss = []
self.training_rewards = []
self.mean_training_rewards = []
self.training_losses = []
self.sync_eps = []
self.total_reward = 0
self.step_count = 0
self.state0 = None
self.gamma = None # Defined on training
# Tomamos una nueva acción
def take_step(self, eps: float, mode: str = 'train'):
"""
Avanza un paso en el episodio.
Args:
eps: epsilon a utilizar por el agente
mode: modo de elección de acciones
"""
if mode == 'explore':
# acción aleatoria en el burn-in y en la fase de exploración (epsilon)
action = self.env.action_space.sample()
else:
# acción a partir del valor de Q (elección de la acción con mejor Q)
action = self.get_action(self.state0, eps)
self.step_count += 1
# Realizamos la acción y obtenemos el nuevo estado y la recompensa
new_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
self.total_reward += reward
self.buffer.append(self.state0, action, reward, terminated, new_state) # guardamos experiencia en el buffer
self.state0 = new_state.copy()
if done:
self.state0, _ = self.env.reset()
return done
# Entrenamiento
def train(self,
gamma: float = 0.99,
max_episodes: int = 50000,
dnn_update_frequency: int = 4,
dnn_sync_frequency: int = 2000):
"""
Entrena al agente.
Args:
gamma: valor de la constante gamma de la ecuación de Bellman
max_episodes: número máximo de episodios de entrenamiento
dnn_update_frequency: frecuencia de actualización de la red principal
dnn_sync_frequency: frecuencia de sincronización de las redes
Returns:
tiempo de entrenamiento (en minutos)
"""
start_time = time.time()
self.gamma = gamma
# Rellenamos el buffer con N experiencias aleatorias:
self.state0, _ = self.env.reset()
print("Filling replay buffer...")
while self.buffer.burn_in_capacity() < 1:
self.take_step(self.epsilon, mode='explore')
# Iniciamos el entrenamiento:
episode = 0
training = True
print("Training...")
while training:
self.state0, _ = self.env.reset()
self.total_reward = 0
done_game = False
while not done_game:
# El agente toma una acción
done_game = self.take_step(self.epsilon, mode='train')
# Actualizamos la red principal según la frecuencia establecida
if self.step_count % dnn_update_frequency == 0:
self.update()
# Sincronizamos la red principal y la red objetivo según la frecuencia establecida
if self.step_count % dnn_sync_frequency == 0:
self.target_network.load_state_dict(
self.dnnetwork.state_dict())
self.sync_eps.append(episode)
# Si el episodio ha concluido:
if done_game:
episode += 1
self.training_rewards.append(self.total_reward) # guardamos las recompensas obtenidas
self.training_losses.append(sum(self.update_loss) / len(self.update_loss))
self.update_loss = []
mean_rewards = np.mean( # calculamos la media de recompensa de los últimos X episodios
self.training_rewards[-self.nblock:])
self.mean_training_rewards.append(mean_rewards)
print("\rEpisode {:d} Mean Rewards {:.2f} Epsilon {}\t\t".format(
episode, mean_rewards, self.epsilon), end="")
# Comprobamos que todavía quedan episodios
if episode >= max_episodes:
print('\nEpisode limit reached.')
end_time = time.time()
return round((end_time - start_time) / 60, 2)
# Termina el juego si la media de recompensas ha llegado al umbral fijado para este juego
if mean_rewards >= self.reward_threshold:
print('\nEnvironment solved in {} episodes!'.format(
episode))
end_time = time.time()
return round((end_time - start_time) / 60, 2)
# Actualizamos epsilon según la velocidad de decaimiento fijada
self.epsilon = max(self.epsilon * self.eps_decay, self.min_epsilon)
# Cálculo de la pérdida
def calculate_loss(self, batch):
"""
Calcula la pérdida correspondiente a un batch de experiencias.
"""
# Separamos las variables de la experiencia y las convertimos a tensores
states, actions, rewards, dones, next_states = [i for i in batch]
rewards_vals = torch.FloatTensor(rewards).to(device=self.dnnetwork.device).reshape(-1, 1)
actions_vals = torch.LongTensor(np.array(actions)).to(
device=self.dnnetwork.device).reshape(-1, 1)
dones_t = torch.BoolTensor(dones).to(device=self.dnnetwork.device).reshape(-1, 1)
# Obtenemos los valores de Q de la red principal
qvals = torch.gather(self.dnnetwork.get_qvals(states), 1, actions_vals)
next_actions = torch.max(self.dnnetwork.get_qvals(next_states), dim=-1)[1]
next_actions_vals = next_actions.reshape(-1, 1)
# Obtenemos los valores de Q de la red objetivo
target_qvals = self.target_network.get_qvals(next_states)
qvals_next = torch.gather(target_qvals, 1, next_actions_vals).detach()
#####
qvals_next.masked_fill_(dones_t, 0) # 0 en estados terminales
# Calculamos la ecuación de Bellman
expected_qvals = self.gamma * qvals_next + rewards_vals
# Calculamos la pérdida
loss = torch.nn.MSELoss()(qvals, expected_qvals.reshape(-1, 1))
return loss
def update(self):
"""
Actualiza la red principal.
"""
self.dnnetwork.optimizer.zero_grad() # eliminamos cualquier gradiente pasado
batch = self.buffer.sample_batch(batch_size=self.batch_size) # seleccionamos un conjunto del buffer
loss = self.calculate_loss(batch) # calculamos la pérdida
loss.backward() # hacemos la diferencia para obtener los gradientes
self.dnnetwork.optimizer.step() # aplicamos los gradientes a la red neuronal
# Guardamos los valores de pérdida
if self.dnnetwork.device != 'cpu':
self.update_loss.append(loss.detach().cpu().numpy())
else:
self.update_loss.append(loss.detach().numpy())
def get_action(self, state, epsilon=0.01):
"""
Devuelve la acción a seguir según la política eps-greedy del agente en la observación 'state'.
Args:
state: observación del entorno
epsilon: epsilon a utilizar por la política epsilon-greedy
"""
if np.random.random() < epsilon:
action = np.random.choice(self.dnnetwork.actions) # acción aleatoria
else:
qvals = self.dnnetwork.get_qvals(state) # acción a partir del cálculo del valor de Q para esa acción
action = torch.max(qvals, dim=-1)[1].item()
return action
if __name__ == '__main__':
# Inicialización:
env_dict = {'id': 'LunarLander-v2', 'render_mode': 'rgb_array'}
environment = gym.make(**env_dict)
# Utilizamos la cpu porque en este caso es más rápida:
DEVICE = torch.device('cpu')
agent_name = "double_dqn"
agent_title = 'Agente Double DQN'
try:
os.mkdir(agent_name)
except FileExistsError:
pass
# Fijamos las semillas utilizadas, por reproducibilidad:
# Referencias:
# + https://pytorch.org/docs/stable/notes/randomness.html,
# + https://harald.co/2019/07/30/reproducibility-issues-using-openai-gym/
# + https://gymnasium.farama.org/content/migration-guide/
RANDOM_SEED = 666
random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
environment.reset(seed=RANDOM_SEED)
environment.action_space.seed(RANDOM_SEED)
# Hyperparams:
MEMORY_SIZE = 10000 # Máxima capacidad del buffer
BURN_IN = 1000 # Número de pasos iniciales usados para rellenar el buffer antes de entrenar
MAX_EPISODES = 1000 # Número máximo de episodios (el agente debe aprender antes de llegar a este valor)
INIT_EPSILON = 1 # Valor inicial de epsilon
EPSILON_DECAY = .98 # Decaimiento de epsilon
MIN_EPSILON = 0.01 # Valor mínimo de epsilon en entrenamiento
GAMMA = 0.99 # Valor gamma de la ecuación de Bellman
BATCH_SIZE = 32 # Conjunto a coger del buffer para la red neuronal
LR = 0.001 # Velocidad de aprendizaje
DNN_UPD = 3 # Frecuencia de actualización de la red neuronal
DNN_SYNC = 1000 # Frecuencia de sincronización de pesos entre la red neuronal y la red objetivo
# Agent initialization:
er_buffer = ExperienceReplayBuffer(memory_size=MEMORY_SIZE, burn_in=BURN_IN)
# Reutilizamos la implementación de la red DQN:
double_dqn = DQN(env=environment, learning_rate=LR, device=DEVICE)
double_dqn_agent = DoubleDQNAgent(
env=environment,
dnnetwork=double_dqn,
buffer=er_buffer,
epsilon=INIT_EPSILON,
eps_decay=EPSILON_DECAY,
batch_size=BATCH_SIZE,
min_epsilon=MIN_EPSILON
)
# Agent training:
training_time = double_dqn_agent.train(
gamma=GAMMA,
max_episodes=MAX_EPISODES,
dnn_update_frequency=DNN_UPD,
dnn_sync_frequency=DNN_SYNC
)
print(f"Training time: {training_time} minutes.")
# double_dqn_agent.dnnetwork.load_state_dict(torch.load(f'{agent_name}/{agent_name}_Trained_Model.pth'))
# Training evaluation:
plot_rewards(
training_rewards=double_dqn_agent.training_rewards,
mean_training_rewards=double_dqn_agent.mean_training_rewards,
reward_threshold=environment.spec.reward_threshold,
title=agent_title,
save_file_name=f'{agent_name}/{agent_name}_rewards.png'
)
plot_losses(
training_losses=double_dqn_agent.training_losses,
title=agent_title,
save_file_name=f'{agent_name}/{agent_name}_losses.png'
)
# Saving:
torch.save(obj=double_dqn_agent.dnnetwork.state_dict(),
f=f'{agent_name}/{agent_name}_Trained_Model.pth')
# Evaluation:
eval_eps = 0
eval_games_seed = 0
tr, _ = play_games_using_agent(
environment_dict=env_dict,
agent=double_dqn_agent,
n_games=100,
games_seed=eval_games_seed,
eps=eval_eps
)
plot_evaluation_rewards(
rewards=tr,
reward_threshold=environment.spec.reward_threshold,
title=agent_title,
save_file_name=f'{agent_name}/{agent_name}_evaluation.png'
)
print(f"Rewards std: {tr.std()}")
print(f'well_landed_eval_episodes: {sum(tr >= 200)}')
print(f'landed_eval_episodes: {sum((tr < 200) & (tr >= 100))}')
print(f'crashed_eval_episodes: {sum(tr < 100)}')
# Rendering interesting games:
gif_games = sorted(np.where(tr < 200)[0])
render_env_dict = {'id': 'LunarLander-v2', 'render_mode': 'human'}
for episode_n in gif_games:
print(f"Rendering episode number {episode_n}.")
render_agent_episode(
env_dict=render_env_dict,
ag=double_dqn_agent,
game_seed=eval_games_seed + int(episode_n),
eps=eval_eps
)
# Saving random game:
save_agent_gif(
env_dict=env_dict,
ag=double_dqn_agent,
save_file_name=f'{agent_name}/agente_{agent_name}.gif',
eps=eval_eps
)