{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Eligibility traces" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " import google.colab\n", " IN_COLAB = True\n", "except:\n", " IN_COLAB = False\n", "\n", "if IN_COLAB:\n", " !pip install -U gymnasium pygame moviepy\n", " !pip install gymnasium[box2d]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "rng = np.random.default_rng()\n", "import matplotlib.pyplot as plt\n", "import os\n", "from IPython.display import clear_output\n", "\n", "import gymnasium as gym\n", "print(\"gym version:\", gym.__version__)\n", "\n", "import pygame\n", "from moviepy.editor import ImageSequenceClip, ipython_display\n", "\n", "class GymRecorder(object):\n", " \"\"\"\n", " Simple wrapper over moviepy to generate a .gif with the frames of a gym environment.\n", " \n", " The environment must have the render_mode `rgb_array_list`.\n", " \"\"\"\n", " def __init__(self, env):\n", " self.env = env\n", " self._frames = []\n", "\n", " def record(self, frames):\n", " \"To be called at the end of an episode.\"\n", " for frame in frames:\n", " self._frames.append(np.array(frame))\n", "\n", " def make_video(self, filename):\n", " \"Generates the gif video.\"\n", " directory = os.path.dirname(os.path.abspath(filename))\n", " if not os.path.exists(directory):\n", " os.mkdir(directory)\n", " self.clip = ImageSequenceClip(list(self._frames), fps=self.env.metadata[\"render_fps\"])\n", " self.clip.write_gif(filename, fps=self.env.metadata[\"render_fps\"], loop=0)\n", " del self._frames\n", " self._frames = []\n", "\n", "def running_average(x, N):\n", " kernel = np.ones(N) / N\n", " return np.convolve(x, kernel, mode='same')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Q-learning in Gridworld\n", "\n", "### Random interaction with the environment\n", "\n", "The goal of this exercise is to solve the **Gridworld** problem using Q-learning. The code is adapted from \n", "\n", "The agent is represented by the blue circle: the **state** $s$ of the agent is its position in the 5x5 grid, i.e. a number between 0 and 24.\n", "\n", "The agent can move either to the left, right, top or bottom. When the agent tries to move outside of the environment, it stays at its current position. There are four **actions** $a$ available, which are deterministic. \n", "\n", "Its goal is to reach the green circle, while avoiding the red ones. Actions leading to the green circle receive a reward $r$ of +100, actions leading to a red square receive a reward of -100. The episode ends in those states. All other actions have a reward of -1. An episode stops after 100 steps if a goal has not been reached." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class GridWorldEnv(gym.Env):\n", " metadata = {\"render_modes\": [\"human\", \"rgb_array\", \"rgb_array_list\"], \"render_fps\": 4}\n", "\n", " def __init__(self, render_mode=None, size=5, rewards=[100, -100, -1]):\n", " self.size = size # The size of the square grid\n", " self.window_size = 512 # The size of the PyGame window\n", " self.rewards = rewards\n", " self._step = 0\n", "\n", " # The state is the flattened (x, y) coordinate of the agent\n", " self.observation_space = gym.spaces.Discrete(size**2)\n", "\n", " # Goal location\n", " self._target_location = np.array([3, 2], dtype=int)\n", " self._distractor1_location = np.array([3, 1], dtype=int)\n", " self._distractor2_location = np.array([2, 2], dtype=int)\n", "\n", " # We have 4 actions, corresponding to \"right\", \"up\", \"left\", \"down\"\n", " self.action_space = gym.spaces.Discrete(4)\n", "\n", " self._action_to_direction = {\n", " 0: np.array([1, 0]), # right\n", " 1: np.array([0, 1]), # down\n", " 2: np.array([-1, 0]), # left\n", " 3: np.array([0, -1]), # up\n", " }\n", "\n", " assert render_mode is None or render_mode in self.metadata[\"render_modes\"]\n", " self.render_mode = render_mode\n", "\n", " if self.render_mode == \"rgb_array_list\":\n", " self._frames = []\n", " self.window = None\n", " self.clock = None\n", " self.font = pygame.font.SysFont(None, 16)\n", " self.Q = np.zeros((self.observation_space.n, self.action_space.n))\n", "\n", "\n", " def _state2coordinates(self, state):\n", " \"Returns coordinates of a state.\"\n", " return (state % self.size, int(state/self.size))\n", "\n", " def _coordinate2state(self, coord):\n", " \"Returns the state with the coordinates.\"\n", " return coord[1] * self.size + coord[0]\n", "\n", " def reset(self, seed=None, options=None):\n", "\n", " self._step = 0\n", "\n", " # Initial location\n", " self._agent_location = np.array([0, 0], dtype=int)\n", "\n", " if self.render_mode == \"human\":\n", " self._render_frame()\n", " \n", " if self.render_mode == \"rgb_array_list\":\n", " self._frames = []\n", " self._render_frame()\n", "\n", " return self._coordinate2state(self._agent_location), {}\n", "\n", "\n", " def step(self, action):\n", "\n", " # Map the action (element of {0,1,2,3}) to the direction we walk in\n", " direction = self._action_to_direction[action]\n", " \n", " # We use `np.clip` to make sure we don't leave the grid\n", " self._agent_location = np.clip(\n", " self._agent_location + direction, 0, self.size - 1\n", " )\n", " \n", " # An episode is done if the agent has reached the target or the distractors\n", " if np.array_equal(self._agent_location, self._target_location):\n", " terminal = True\n", " reward = self.rewards[0]\n", " elif np.array_equal(self._agent_location, self._distractor1_location) \\\n", " or np.array_equal(self._agent_location, self._distractor2_location):\n", " terminal = True\n", " reward = self.rewards[1]\n", " else:\n", " terminal = False\n", " reward = self.rewards[2]\n", "\n", " if self.render_mode == \"human\" or self.render_mode == \"rgb_array_list\":\n", " self._render_frame()\n", "\n", " self._step += 1\n", " if self._step == 100:\n", " truncated = True\n", " else:\n", " truncated = False\n", "\n", " return self._coordinate2state(self._agent_location), reward, terminal, truncated, {}\n", "\n", " def render(self):\n", " if self.render_mode == \"rgb_array\":\n", " return self._render_frame()\n", " elif self.render_mode == \"rgb_array_list\":\n", " f = self._frames.copy()\n", " self._frames = []\n", " return f\n", "\n", " def _render_frame(self):\n", "\n", " if self.window is None and self.render_mode == \"human\":\n", " pygame.init()\n", " pygame.display.init()\n", " self.window = pygame.display.set_mode(\n", " (self.window_size, self.window_size)\n", " )\n", " if self.clock is None and self.render_mode == \"human\":\n", " self.clock = pygame.time.Clock()\n", "\n", " canvas = pygame.Surface((self.window_size, self.window_size))\n", " canvas.fill((255, 255, 255))\n", " pix_square_size = (\n", " self.window_size / self.size\n", " ) # The size of a single grid square in pixels\n", "\n", " # First we draw the target and the distractors\n", " pygame.draw.rect(\n", " canvas,\n", " (0, 255, 0),\n", " pygame.Rect(\n", " pix_square_size * self._target_location,\n", " (pix_square_size, pix_square_size),\n", " ),\n", " )\n", " pygame.draw.rect(\n", " canvas,\n", " (255, 0, 0),\n", " pygame.Rect(\n", " pix_square_size * self._distractor1_location,\n", " (pix_square_size, pix_square_size),\n", " ),\n", " )\n", " pygame.draw.rect(\n", " canvas,\n", " (255, 0, 0),\n", " pygame.Rect(\n", " pix_square_size * self._distractor2_location,\n", " (pix_square_size, pix_square_size),\n", " ),\n", " )\n", "\n", " # Now we draw the agent\n", " pygame.draw.circle(\n", " canvas,\n", " (0, 0, 255),\n", " (self._agent_location + 0.5) * pix_square_size,\n", " pix_square_size / 3,\n", " )\n", "\n", " # Add some gridlines\n", " for x in range(self.size + 1):\n", " pygame.draw.line(\n", " canvas,\n", " 0,\n", " (0, pix_square_size * x),\n", " (self.window_size, pix_square_size * x),\n", " width=3,\n", " )\n", " pygame.draw.line(\n", " canvas,\n", " 0,\n", " (pix_square_size * x, 0),\n", " (pix_square_size * x, self.window_size),\n", " width=3,\n", " )\n", "\n", " # Print Q-values\n", " for x in range(self.size):\n", " for y in range(self.size):\n", " s = self._coordinate2state((x, y))\n", " \n", " # Up\n", " val = f\"{self.Q[s, 3]:+.2f}\"\n", " text = self.font.render(val, True, (0, 0, 0))\n", " canvas.blit(text, \n", " ((x + 0.5) * pix_square_size - 6, \n", " (y) * pix_square_size + 6)\n", " )\n", " # Down\n", " val = f\"{self.Q[s, 1]:+.2f}\"\n", " text = self.font.render(val, True, (0, 0, 0))\n", " canvas.blit(text, \n", " ((x + 0.5) * pix_square_size - 6, \n", " (y+1) * pix_square_size - 12)\n", " )\n", " # Left\n", " val = f\"{self.Q[s, 2]:+.2f}\"\n", " text = self.font.render(val, True, (0, 0, 0))\n", " canvas.blit(text, \n", " ((x) * pix_square_size + 6, \n", " (y+ 0.5) * pix_square_size - 6)\n", " )\n", " # Right\n", " val = f\"{self.Q[s, 0]:+.2f}\"\n", " text = self.font.render(val, True, (0, 0, 0))\n", " canvas.blit(text, \n", " ((x + 1) * pix_square_size - 32, \n", " (y+ 0.5) * pix_square_size - 6)\n", " )\n", "\n", "\n", " if self.render_mode == \"human\":\n", " # The following line copies our drawings from `canvas` to the visible window\n", " self.window.blit(canvas, canvas.get_rect())\n", " pygame.event.pump()\n", " pygame.display.update()\n", "\n", " # We need to ensure that human-rendering occurs at the predefined framerate.\n", " # The following line will automatically add a delay to keep the framerate stable.\n", " self.clock.tick(self.metadata[\"render_fps\"])\n", "\n", " elif self.render_mode == \"rgb_array\":\n", " return np.transpose(\n", " np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)\n", " )\n", " elif self.render_mode == \"rgb_array_list\":\n", " array = np.transpose(\n", " np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)\n", " )\n", " self._frames.append(array)\n", "\n", " def close(self):\n", " if self.window is not None:\n", " pygame.display.quit()\n", " pygame.quit()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class RandomAgent:\n", " \n", " def __init__(self, env):\n", " self.env = env\n", " self.Q = np.zeros((self.env.observation_space.n, self.env.action_space.n))\n", " \n", " def act(self, state):\n", " \"Selects an action randomly\"\n", " return self.env.action_space.sample()\n", " \n", " def train(self, nb_episodes, recorder=None):\n", " \"Runs the agent on the environment for nb_episodes.\"\n", " # Returns\n", " returns = []\n", " steps = []\n", "\n", " # Fixed number of episodes\n", " for episode in range(nb_episodes):\n", "\n", " # Reset\n", " state, info = self.env.reset()\n", " done = False\n", " nb_steps = 0\n", "\n", " # Store rewards\n", " return_episode = 0.0\n", "\n", " # Sample the episode\n", " while not done:\n", " \n", " # Select an action \n", " action = self.act(state)\n", "\n", " # Perform the action\n", " next_state, reward, terminal, truncated, info = self.env.step(action)\n", " \n", " # Append reward\n", " return_episode += reward\n", "\n", " # Go in the next state\n", " state = next_state\n", "\n", " # Increment time\n", " nb_steps += 1\n", "\n", " # Terminal state\n", " done = terminal or truncated\n", " \n", " # Pass the Q table to the GUI\n", " self.env.Q = self.Q \n", "\n", " # Store info\n", " returns.append(return_episode)\n", " steps.append(nb_steps)\n", " \n", " return returns, steps" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create the environment\n", "env = GridWorldEnv(render_mode='human')\n", "\n", "# Create the agent\n", "agent = RandomAgent(env)\n", "\n", "# Perform random episodes\n", "returns, steps = agent.train(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Adapt your Q-learning agent from last exercise to the problem. The main difference is the call to `self.env.Q = self.Q` so that the GUI displays the Q-values, the rest is similar. Train it for 100 episodes with the right hyperparameters and without rendering." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Train a Q-learning agent with rendering on. Observe in particular which Q-values are updated when the agent reaches the target. Is it efficient?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Modify your agent so that it uses **softmax action selection**, with a temperature $\\tau = 1.0$ and a suitable decay. What does it change?\n", "\n", "If you have time, write a generic class for the Q-learning agent where you can select the action selection method flexibly." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Eligibility traces\n", "\n", "The main drawback of Q-learning is that it needs many episodes to converge (**sample complexity**).\n", "\n", "One way to speed up learning is to use eligibility traces, one per state-action pair:\n", "\n", "```python\n", "traces = np.zeros((nb_states, nb_actions))\n", "```\n", "\n", "After each transition $(s_t, a_t)$, Q($\\lambda$) updates a **trace** $e(s_t, a_t)$ and modifies all Q-values as:\n", "\n", "1. The trace of the last transition is incremented from 1:\n", " \n", "$$e(s_t, a_t) = e(s_t, a_t) +1$$\n", " \n", "2. Q($\\lambda$)-learning is applied on **ALL** Q-values, using the TD error at time $t$:\n", " \n", "$$Q(s, a) = Q(s, a) + \\alpha \\, (r_{t+1} + \\gamma \\, \\max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t)) \\, e(s, a)$$\n", " \n", "3. All traces are exponentially decreased using the trace parameter $\\lambda$ (e.g. 0.7):\n", "\n", "$$\n", "e(s, a) = \\lambda \\, \\gamma \\, e(s, a)\n", "$$\n", "\n", "All traces are reset to 0 at the beginning of an episode.\n", "\n", "**Q:** Implement eligibility traces in your Q($\\lambda$)-learning agent and see if it improves convergence. Train it with rendering on and observe how all Q-values are updated." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Vary the trace parameter $\\lambda$ and discuss its influence." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Q:** Increase the size of Gridworld to 100x100 and observe how long it takes to learn the optimal strategy using eligibility traces or not.\n", "\n", "```python\n", "env = GridWorldEnv(size=100)\n", "```\n", "\n", "Comment on the **curse of dimensionality** and the interest of tabular RL for complex tasks with large state spaces and sparse rewards (e.g. robotics)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.13 ('deeprl')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" }, "vscode": { "interpreter": { "hash": "932956c8e5d2f79d68ff59e849758b6e4ddbf01f7f22c7d8bb3532c38341d908" } } }, "nbformat": 4, "nbformat_minor": 4 }