{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# DQN in pytorch\n", "\n", "The goal of this exercise is to implement DQN using pytorch and to apply it to the cartpole balancing problem. \n", "\n", "The code is adapted from the Pytorch tutorial: ." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " import google.colab\n", " IN_COLAB = True\n", "except:\n", " IN_COLAB = False\n", "\n", "if IN_COLAB:\n", " !pip install -U gymnasium pygame swig\n", " !pip install -U moviepy==1.0.3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Default libraries\n", "import math\n", "import random\n", "import os\n", "import time\n", "import numpy as np\n", "rng = np.random.default_rng()\n", "import matplotlib.pyplot as plt\n", "from collections import namedtuple, deque\n", "\n", "# Gymnasium\n", "import gymnasium as gym\n", "print(\"gym version:\", gym.__version__)\n", "\n", "# pytorch\n", "import torch\n", "import torch.nn.functional as F\n", "\n", "# Select hardware: \n", "if torch.cuda.is_available(): # GPU\n", " device = torch.device(\"cuda\")\n", "elif torch.backends.mps.is_available(): # Metal (Macos)\n", " device = torch.device(\"mps\")\n", "else: # CPU\n", " device = torch.device(\"cpu\")\n", "print(f\"Device: {device}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from moviepy.editor import ImageSequenceClip, ipython_display\n", "\n", "class GymRecorder(object):\n", " \"\"\"\n", " Simple wrapper over moviepy to generate a .gif with the frames of a gym environment.\n", " \n", " The environment must have the render_mode `rgb_array_list`.\n", " \"\"\"\n", " def __init__(self, env):\n", " self.env = env\n", " self._frames = []\n", "\n", " def record(self, frames):\n", " \"To be called at the end of an episode.\"\n", " for frame in frames:\n", " self._frames.append(np.array(frame))\n", "\n", " def make_video(self, filename):\n", " \"Generates the gif video.\"\n", " directory = os.path.dirname(os.path.abspath(filename))\n", " if not os.path.exists(directory):\n", " os.mkdir(directory)\n", " self.clip = ImageSequenceClip(list(self._frames), fps=self.env.metadata[\"render_fps\"])\n", " self.clip.write_gif(filename, fps=self.env.metadata[\"render_fps\"], loop=0)\n", " del self._frames\n", " self._frames = []" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cartpole balancing task\n", "\n", "We are going to use the Cartpole balancing problem, which can be loaded with:\n", "\n", "```python\n", "gym.make('CartPole-v0', render_mode=\"rgb_array_list\")\n", "```\n", "\n", "States have 4 continuous values (position and speed of the cart, angle and speed of the pole) and 2 discrete outputs (going left or right). The reward is +1 for each transition where the pole is still standing (angle of less than 30° with the vertical). \n", "\n", "In CartPole-v0, the episode ends when the pole fails or after 200 steps. In CartPole-v1, the maximum episode length is 500 steps, which is too long for us, so we stick to v0 here.\n", "\n", "The maximal (undiscounted) return is therefore 200. Can DQN learn this?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create the environment\n", "env = gym.make('CartPole-v0', render_mode=\"rgb_array_list\")\n", "recorder = GymRecorder(env)\n", "\n", "# Sample the initial state\n", "state, info = env.reset()\n", "\n", "# One episode:\n", "done = False\n", "return_episode = 0\n", "while not done:\n", "\n", " # Select an action randomly\n", " action = env.action_space.sample()\n", " \n", " # Sample a single transition\n", " next_state, reward, terminal, truncated, info = env.step(action)\n", "\n", " # End of the episode\n", " done = terminal or truncated\n", "\n", " # Update undiscounted return\n", " return_episode += reward\n", " \n", " # Go in the next state\n", " state = next_state\n", "\n", "print(\"Return:\", return_episode)\n", "recorder.record(env.render())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "video = \"videos/cartpole_random.gif\"\n", "recorder.make_video(video)\n", "ipython_display(video)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Value network in pytorch\n", "\n", "As the state in Cartpole has only four dimensions, we do not need a CNN for the value network. A simple MLP with a couple of hidden layers will be enough.\n", "\n", "**Q:** Create a MLP class in pytorch taking four inputs and two outputs (one Q-value per action), and two hidden layers of 128 neurons (you can change it later). If possible, make it parameterizable, i.e. have the constructor take in the number of inputs, outputs and hidden neurons. The activation function for the hidden layers should be ReLU." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Create a network, an environment, get the initial state using `env.reset()` and pass it to the `forward()` method of your NN. What happens?\n", "\n", "Do not forget to send the network to your device, especially if you have a GPU. Create the network using something like:\n", "\n", "```python\n", "net = MLP(...).to(device)\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alright, we need to cast the state vector into a pytorch tensor, pytorch does not do it automatically.\n", "\n", "To cast a numpy vector of shape (4,) into a tensor, one simply needs to call:\n", "\n", "```python\n", "state = torch.tensor(state, dtype=torch.float32, device=device)\n", "```\n", "\n", "The dtype must be set to `torch.float32` for floating numbers. Integers should be set to `torch.long`. Do not forget to send the tensor to your device if you plan to pass it to your network.\n", "\n", "**Q:** Pass the new tensor to your network. What is the shape of the output tensor?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The value network outputs one Q-value per action, great. Now, let's identify the **greedy** action, i.e. the one with the highest Q-value. The two actions expected by the cartpole environment are 0 and 1, i.e. the index of the element with the highest Q-value as a Python integer. \n", "\n", "Have a look at those two methods of `Tensor`:\n", "\n", "* ``Tensor.argmax``: \n", "* ``Tensor.item``: \n", "\n", "**Q:** Find a way to obtain the index (as a Python integer) of the element with the highest value in the tensor of Q-values. Check that it works. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Create a dummy agent class (as in the previous exercises) storing a value network and acting using $\\epsilon$-greedy action selection. Add a ``test()``method running a few episodes and possibly recording them. \n", "\n", "The constructor should accept several hyperparameters, such as the `config` dictionary in the following skeleton:\n", "\n", "```python\n", "class RandomDQNAgent:\n", " def __init__(self, env, config):\n", " def act(self, state):\n", " def test(self, nb_episodes, recorder=None):\n", "```\n", "\n", "but feel free to pass the hyperparameters one by one.\n", "\n", "To prepare ourselves, implement a schedule for `epsilon` in the `act()` method: epsilon should start at a high value of 0.9 and decrease exponentially to 0.05 for each action made. The value of epsilon follows this formula:\n", "\n", "$$\n", " \\epsilon = 0.05 + (0.9 - 0.05) * \\exp ( - \\dfrac{t}{1000})\n", "$$\n", "\n", "where t is the number of steps since the start. 0.05, 0.9 and 1000 should be parameters of the class." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Target network\n", "\n", "The original DQN algorithm implies two neural networks:\n", "\n", "1. The value network $Q_\\theta(s, a)$, learning to predict the Q-values for the current state.\n", "2. The target network $Q_{\\theta'}(s, a)$, used to predict the Q-values in the next state.\n", "\n", "The target network is a copy of the value network (in terms of structure and parameters), but the update occurs only from time to time.\n", "\n", "**Q:** Create two MLPs of the same size and predict the Q-values of a single state. What happens?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Obviously, the two MLPs are initialized using random parameters, so they are different. We need a method to copy the weights of a network into another one. \n", "\n", "It is fortunately very easy to save/load the parameters of a pytorch network:\n", "\n", "```python\n", "params = net.state_dict()\n", "net.load_state_dict(params)\n", "```\n", "\n", "**Q:** Apply these methods to update the weights of the target network with the value one. Check that they now predict the same thing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Experience Replay Memory\n", "\n", "The second important component of DQN is the experience replay memory (ERM) or replay buffer. It is a limited size buffer that can store $(s, a, r, s', d)$ transitions, where $d$ is a boolean indicating whether the next state $s'$ is terminal or not (in gymnasium, this is the boolean `done = terminal or truncated`).\n", "\n", "Below is a simple implementation of an ERM. The important data structure here is `deque` (double-ended queue) which behaves like a list when `append()` is called, until its capacity is reached (`maxlen`), in which case new elements overwrite older ones. \n", "\n", "`batch = sample(batch_size)` randomly samples a minibatch from the ERM and returns a structure of $(s, a, r, s', d)$ transitions, nicely casted into pytorch tensors. These tensors are accessed with `batch.state`, `batch.action`, etc." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Named tuples are fancy dictionaries\n", "Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))\n", "\n", "class ReplayMemory(object):\n", " \"Simple Experience Replay Memory using uniform sampling.\"\n", "\n", " def __init__(self, capacity):\n", " self.memory = deque([], maxlen=capacity)\n", "\n", " def append(self, state, action, reward, next_state, done):\n", " \"Appends a transition (s, a, r, s', done) to the buffer.\"\n", "\n", " # Get numpy arrays even if it is a torch tensor\n", " if isinstance(state, (torch.Tensor,)): state = state.numpy(force=True)\n", " if isinstance(next_state, (torch.Tensor,)): next_state = next_state.numpy(force=True)\n", " \n", " # Append to the buffer\n", " self.memory.append(Transition(state, action, reward, next_state, done))\n", "\n", " def sample(self, batch_size):\n", " \"Returns a minibatch of (s, a, r, s', done)\"\n", "\n", " # Sample the batch\n", " transitions = random.sample(self.memory, batch_size)\n", " \n", " # Transpose the batch.\n", " batch = Transition(*zip(*transitions))\n", " \n", " # Cast to tensors\n", " states = torch.tensor(batch.state, dtype=torch.float32, device=device)\n", " actions = torch.tensor(batch.action, dtype=torch.long, device=device)\n", " rewards = torch.tensor(batch.reward, dtype=torch.float32, device=device)\n", " next_states = torch.tensor(batch.next_state, dtype=torch.float32, device=device)\n", " dones = torch.tensor(batch.done, dtype=torch.bool, device=device)\n", "\n", " return Transition(states, actions, rewards, next_states, dones)\n", "\n", " def __len__(self):\n", " return len(self.memory)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Modify your random DQN agent so that it stores a replay buffer of capacity 10000 and appends all transitions into it. Do a few episodes, sample a small minibatch and have a look at the data you obtain." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Use the value network stored into your agent to predict the Q-values of all actions for the states contained in the minibatch. Do NOT use a for loop. Check the size of the resulting tensor. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** The previous tensors returns the value of all actions in those visited states. We now want only the Q-value of action that was taken (whose index is in `batch.action`). The resulting tensor should therefore a vector of length `batch_size`. How do we do that?\n", "\n", "*Hint:* it would take months of practice to master all the indexing methods available in pytorch: . Meanwhile, numpy-style indexing could be useful. Check what the following statements do:\n", "\n", "```python\n", "N = 10\n", "A = torch.randn((N, 2))\n", "B = torch.randint(0, 2, (N,))\n", "C = A[range(N), B]\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DQN agent\n", "\n", "Good, we should now have all the elementary bricks to create our DQN agent. \n", "\n", "Reminder from the lecture:\n", "\n", "---\n", "\n", "* Initialize value network $Q_{\\theta}$ and target network $Q_{\\theta'}$.\n", "\n", "* Initialize experience replay memory $\\mathcal{D}$ of maximal size $N$.\n", "\n", "* for $t \\in [0, T_\\text{total}]$:\n", "\n", " * Select an action $a_t$ based on $Q_\\theta(s_t, a)$ ($\\epsilon$-greedy), observe $s_{t+1}$ and $r_{t+1}$.\n", "\n", " * Store $(s_t, a_t, r_{t+1}, s_{t+1})$ in the experience replay memory.\n", "\n", " * Every $T_\\text{train}$ steps:\n", "\n", " * Sample a minibatch $\\mathcal{D}_s$ randomly from $\\mathcal{D}$.\n", "\n", " * For each transition $(s_k, a_k, r_k, s'_k)$ in the minibatch:\n", "\n", " * Compute the target value $t_k = r_k + \\gamma \\, \\max_{a'} Q_{\\theta'}(s'_k, a')$ using the target network.\n", "\n", " * Update the value network $Q_{\\theta}$ on $\\mathcal{D}_s$ to minimize:\n", "\n", " $$\\mathcal{L}(\\theta) = \\mathbb{E}_{\\mathcal{D}_s}[(t_k - Q_\\theta(s_k, a_k))^2]$$\n", "\n", " * Every $T_\\text{target}$ steps:\n", "\n", " * Update target network: $\\theta' \\leftarrow \\theta$.\n", "---\n", "\n", "Create a DQN agent class inspired from the notebooks on MC or TD. The constructor should create the value and target networks, and make sure that their parameters are the same. It also creates an empty replay buffer. \n", "\n", "The `act()` method implements $\\epsilon$-greedy action selection, with an exponentially decaying schedule for $\\epsilon$. The greedy action is read from the value network.\n", "\n", "The `train()` and `test()` methods run training and test episodes as usual, with optional rendering. The train method should return (or store in the object) the return of each episode (its length) so we can plot it at the end. \n", "\n", "The main difficulty will be the `update()` method, where learning is supposed to happen. It should sample a minibatch from the replay memory, compute a vector of Bellman targets $r_t + \\gamma \\, \\max_a Q(s_{t+1}, a)$ for each transition in the batch, compute the loss function (mse between these targets and the predicted Q-values), backpropagate the gradients and apply the optimizer (Adam, but feel free to pick your preferred optimizer). Refer to the previous notebook on pytorch if you do not know how to do that.\n", "\n", "The main tricky part is when $V(s_{t+1})$ has to be predicted by the **target** network. You do not want the target network to learn from the minibatch, so it should not compute the corresponding gradients to save computational time. You can make sure that the target network is purely in inference mode with the following context:\n", "\n", "```python\n", "with torch.no_grad():\n", " next_Q_values = target_net(batch.next_state)\n", "```\n", "\n", "Of course you want the Q-value of the greedy action in the next state, not the vector of all Q-values, so check the doc of `Tensor.max()`. \n", "\n", "Importantly, when the next state $s'$ is terminal (either the agent failed or the 200 steps are over), the Bellman target should be simply $r_t$ instead of $r_t + \\gamma \\, \\max_a Q(s_{t+1}, a)$, as no action will be taken in the next state. This is why we saved the booleans `done` were saved in the replay buffer. As they are boolean, you can use them for indexing:\n", "\n", "```python\n", "Q = torch.randn((batch_size,))\n", "Q[batch.dones] = 0.0\n", "```\n", "\n", "A minor detail: do not start learning until the replay buffer is full enough, otherwise you will not fill your minibatch. Usually, there is no learning until the buffer contains two or three times the batch size. Use `len(memory)` to know the current number of stored transitions.\n", "\n", "Here is a set of suggested hyperparameters to help you start. Of course, it is strongly advised to modify them and observe their influence, but it depends on the remaining time.\n", "\n", "* $\\gamma = 0.99$.\n", "* MLP with two layers of 128 neurons, Adam optimizer with a fixed learning rate of 0.001.\n", "* Replay buffer of maximum capacity 10000, batch size of 128.\n", "* Target network updated every 120 steps.\n", "* Epsilon-greedy action selection, with the schedule:\n", "\n", "$$\n", " \\epsilon = 0.05 + (0.9 - 0.05) * \\exp ( - \\dfrac{t}{1000})\n", "$$\n", "\n", "where $t$ is the total number of steps.\n", "\n", "\n", "**Q:** Train a DQN on cartpole for 250 episodes. How would you characterize the learning process (speed, stability, etc.). If possible, do several runs. Vary the hyperparameters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 2 }