{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Q-learning " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "try:\n", " import google.colab\n", " IN_COLAB = True\n", "except:\n", " IN_COLAB = False\n", "\n", "if IN_COLAB:\n", " !pip install -U gymnasium pygame moviepy\n", " !pip install gymnasium[box2d]" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "gym version: 0.26.3\n" ] } ], "source": [ "import numpy as np\n", "rng = np.random.default_rng()\n", "import matplotlib.pyplot as plt\n", "import os\n", "\n", "import gymnasium as gym\n", "print(\"gym version:\", gym.__version__)\n", "\n", "from moviepy.editor import ImageSequenceClip, ipython_display\n", "\n", "class GymRecorder(object):\n", " \"\"\"\n", " Simple wrapper over moviepy to generate a .gif with the frames of a gym environment.\n", " \n", " The environment must have the render_mode `rgb_array_list`.\n", " \"\"\"\n", " def __init__(self, env):\n", " self.env = env\n", " self._frames = []\n", "\n", " def record(self, frames):\n", " \"To be called at the end of an episode.\"\n", " for frame in frames:\n", " self._frames.append(np.array(frame))\n", "\n", " def make_video(self, filename):\n", " \"Generates the gif video.\"\n", " directory = os.path.dirname(os.path.abspath(filename))\n", " if not os.path.exists(directory):\n", " os.mkdir(directory)\n", " self.clip = ImageSequenceClip(list(self._frames), fps=self.env.metadata[\"render_fps\"])\n", " self.clip.write_gif(filename, fps=self.env.metadata[\"render_fps\"], loop=0)\n", " del self._frames\n", " self._frames = []\n", "\n", "def running_average(x, N):\n", " kernel = np.ones(N) / N\n", " return np.convolve(x, kernel, mode='same')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this short exercise, we are going to apply **Q-learning** on the Taxi environment used last time for MC control.\n", "\n", "As a reminder, Q-learning updates the Q-value of a state-action pair **after each transition**, using the update rule:\n", "\n", "$$\\Delta Q(s_t, a_t) = \\alpha \\, (r_{t+1} + \\gamma \\, \\max_{a'} \\, Q(s_{t+1}, a') - Q(s_t, a_t))$$\n", "\n", "**Q:** Update the class you designed for online MC in the last exercise so that it implements Q-learning. \n", "\n", "The main difference is that the `update()` method has to be called after each step of the episode, not at the end. It simplifies a lot the code too (no need to iterate backwards on the episode).\n", "\n", "You can use the following parameters at the beginning, but feel free to change them:\n", "\n", "* Discount factor $\\gamma = 0.9$. \n", "* Learning rate $\\alpha = 0.1$.\n", "* Epsilon-greedy action selection, with an initial exploration parameter of 1.0 and an exponential decay of $10^{-5}$ after each update (i.e. every step!).\n", "* A total number of episodes of 20000.\n", "\n", "Keep the general structure of the class: `train()` for the main loop, `test()` to run one episode without exploration, etc. \n", "\n", "Plot the training and test performance in the end and render the learned deterministic policy for one episode.\n", "\n", "*Note:* if $s_{t+1}$ is terminal (`done` is true after the transition), the target should not be $r_{t+1} + \\gamma \\, \\max_{a'} \\, Q(s_{t+1}, a')$, but simply $r_{t+1}$ as there is no next action." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "class QLearningAgent:\n", " \"\"\"\n", " Q-learning agent.\n", " \"\"\"\n", " \n", " def __init__(self, env, gamma, epsilon, decay_epsilon, alpha):\n", " \"\"\"\n", " :param env: gym-like environment\n", " :param gamma: discount factor\n", " :param epsilon: exploration parameter\n", " :param decay_epsilon: exploration decay parameter\n", " :param alpha: learning rate\n", " \"\"\"\n", " self.env = env\n", " self.gamma = gamma\n", " self.epsilon = epsilon\n", " self.decay_epsilon = decay_epsilon\n", " self.alpha = alpha\n", " \n", " # Q_table\n", " self.Q = np.zeros([self.env.observation_space.n, self.env.action_space.n])\n", " \n", " def act(self, state):\n", " \"Returns an action using epsilon-greedy action selection.\"\n", " \n", " action = rng.choice(np.where(self.Q[state, :] == self.Q[state, :].max())[0])\n", " \n", " if rng.random() < self.epsilon:\n", " action = self.env.action_space.sample() \n", " \n", " return action\n", " \n", " def update(self, state, action, reward, next_state, done):\n", " \"Updates the agent using a single transition.\"\n", " \n", " # Bellman target\n", " target = reward\n", " \n", " if not done:\n", " target += self.gamma * self.Q[next_state, :].max()\n", " \n", " # Update the Q-value\n", " self.Q[state, action] += self.alpha * (target - self.Q[state, action])\n", " \n", " # Decay epsilon\n", " self.epsilon = self.epsilon * (1 - self.decay_epsilon)\n", " \n", " \n", " def train(self, nb_episodes, recorder=None):\n", " \"Runs the agent on the environment for nb_episodes. Returns the list of obtained returns.\"\n", "\n", " # Returns\n", " returns = []\n", " steps = []\n", "\n", " # Fixed number of episodes\n", " for episode in range(nb_episodes):\n", "\n", " # Reset\n", " state, info = self.env.reset()\n", " done = False\n", " nb_steps = 0\n", "\n", " # Store rewards\n", " return_episode = 0.0\n", "\n", " # Sample the episode\n", " while not done:\n", "\n", " # Select an action \n", " action = self.act(state)\n", "\n", " # Perform the action\n", " next_state, reward, terminal, truncated, info = self.env.step(action)\n", " \n", " # End of the episode\n", " done = terminal or truncated\n", "\n", " # Learn from the transition\n", " self.update(state, action, reward, next_state, done)\n", "\n", " # Go in the next state\n", " state = next_state\n", "\n", " # Increment time\n", " nb_steps += 1\n", " return_episode += reward \n", " \n", "\n", " # Record at the end of the episode\n", " if recorder is not None and episode == nb_episodes -1:\n", " recorder.record(self.env.render())\n", "\n", " # Store info\n", " returns.append(return_episode)\n", " steps.append(nb_steps)\n", " \n", " \n", " return returns, steps\n", " \n", " def test(self, recorder=None):\n", " \"Performs a test episode without exploration.\"\n", " previous_epsilon = self.epsilon\n", " self.epsilon = 0.0\n", " \n", " # Reset\n", " state, info = self.env.reset()\n", " done = False\n", " nb_steps = 0\n", " return_episode= 0\n", "\n", " # Sample the episode\n", " while not done:\n", " action = self.act(state)\n", " next_state, reward, terminal, truncated, info = self.env.step(action)\n", " done = terminal or truncated\n", " return_episode += reward\n", " state = next_state\n", " nb_steps += 1\n", " \n", " self.epsilon = previous_epsilon\n", " \n", " if recorder is not None:\n", " recorder.record(self.env.render())\n", "\n", " return return_episode, nb_steps" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Parameters\n", "gamma = 0.9\n", "epsilon = 1.0\n", "decay_epsilon = 1e-5\n", "alpha = 0.1\n", "nb_episodes = 20000\n", "\n", "# Create the environment\n", "env = gym.make(\"Taxi-v3\")\n", "\n", "# Create the agent\n", "agent = QLearningAgent(env, gamma, epsilon, decay_epsilon, alpha)\n", "\n", "# Train the agent \n", "returns, steps = agent.train(nb_episodes)\n", "\n", "# Plot training returns\n", "plt.figure(figsize=(15, 6))\n", "plt.subplot(121)\n", "plt.plot(returns)\n", "plt.plot(running_average(returns, 1000))\n", "plt.xlabel(\"Episodes\")\n", "plt.ylabel(\"Returns\")\n", "plt.subplot(122)\n", "plt.plot(steps)\n", "plt.plot(running_average(steps, 1000))\n", "plt.xlabel(\"Episodes\")\n", "plt.ylabel(\"steps\")\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Test performance 7.9\n" ] }, { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Test the agent for 1000 episodes\n", "test_returns = []\n", "test_steps = []\n", "for episode in range(1000):\n", " return_episode, nb_steps = agent.test()\n", " test_returns.append(return_episode)\n", " test_steps.append(nb_steps)\n", "print(\"Test performance\", np.mean(test_returns))\n", "\n", "plt.figure(figsize=(15, 6))\n", "plt.subplot(121)\n", "plt.hist(test_returns)\n", "plt.xlabel(\"Returns\")\n", "plt.subplot(122)\n", "plt.hist(test_steps)\n", "plt.xlabel(\"Number of steps\")\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "MoviePy - Building file videos/taxi-trained-td.gif with imageio.\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "data": { "text/html": [ "