{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "UxK08xN_wco-" }, "source": [ "# HW5: Dyna-Q\n", "\n", "> - Full Name: **[Full Name]**\n", "> - Student ID: **[Stundet ID]**\n", "\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/DeepRLCourse/Homework-5-Questions/blob/main/RL_HW5_Dyna.ipynb)\n", "[![Open In kaggle](https://kaggle.com/static/images/open-in-kaggle.svg)](https://kaggle.com/kernels/welcome?src=https://raw.githubusercontent.com/DeepRLCourse/Homework-5-Questions/main/RL_HW5_Dyna.ipynb)\n", "\n", "## Overview\n", "Here the goal is to implement **Dyna-Q** for [gymnasium environments](https://gymnasium.farama.org/).\n", "More specificly we focus on the [Frozen Lake](https://gymnasium.farama.org/environments/toy_text/frozen_lake/) environment and try to solve its variants (different map sizes, slippery/non-slippery, etc).\n", "\n", "No matter which setting you choose the plotting functions and algorithms don't need any alteration!\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "SBD13rNwmXpx" }, "outputs": [], "source": [ "# @title Imports\n", "\n", "# Stuff you (might) need\n", "import random\n", "import numpy as np\n", "import gymnasium as gym\n", "from tqdm.notebook import trange\n", "from heapq import heappush, heappop\n", "from collections import defaultdict\n", "\n", "# Stuff used for visualization\n", "import matplotlib\n", "from matplotlib import pyplot as plt\n", "import matplotlib.patches as patches\n", "from matplotlib.gridspec import GridSpec\n", "\n", "import base64\n", "import imageio\n", "import IPython\n", "\n", "import seaborn as sns" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "Avcbl8VMmgm1" }, "outputs": [], "source": [ "# @title Visualization Functions\n", "\n", "def embed_mp4(filename):\n", " video = open(filename,'rb').read()\n", " b64 = base64.b64encode(video)\n", " tag = '''\n", " '''.format(b64.decode())\n", "\n", " return IPython.display.HTML(tag)\n", "\n", "\n", "def create_policy_eval_video(env, policy, filename, Q=None, num_episodes=1):\n", " filename = filename + \".mp4\"\n", " with imageio.get_writer(filename, fps=env.metadata['render_fps']) as video:\n", " for _ in range(num_episodes):\n", " state, info = env.reset()\n", " video.append_data(env.render())\n", " while True:\n", " action = policy(state, Q)\n", " state, reward, terminated, truncated, info = env.step(action)\n", " video.append_data(env.render())\n", " if terminated or truncated:\n", " break\n", " return embed_mp4(filename)\n", "\n", "\n", "def plot_rewards(rewards, average_range=None, ax=None, show=False):\n", " if ax is None:\n", " fig, ax = plt.subplots()\n", " xs = range(1, len(rewards) + 1)\n", " ax.plot(xs, rewards, marker='o', linestyle='--', label='Episode Reward')\n", " ax.plot(xs, np.cumsum(rewards) / xs, marker='x', label='Cumulative Average')\n", " ax.legend()\n", " ax.set(xlabel='Episodes', ylabel='Total Reward', title='Episode Rewards')\n", " if show:\n", " plt.show()\n", "\n", "\n", "def plot_heatmap(env, value, color_terminal_states=True, ax=None, show=False):\n", " # Generate heatmap showing maximum value at each state\n", " if ax is None:\n", " fig, ax = plt.subplots()\n", " dim_x, dim_y = env.unwrapped.desc.shape\n", " action_max = value.argmax(axis=1)\n", " value_max = value.max(axis=1).reshape(dim_y, dim_x)\n", " act_dict = {0: 'U', 1: 'R', 2: 'D', 3: 'L'}\n", " act_dict = {0: '←', 1: '↓', 2: '→', 3: '↑'}\n", " labels = np.array([act_dict.get(action, '') for action in action_max])\n", " for i in range(dim_x * dim_y):\n", " if env.unwrapped.desc[i // dim_x, i % dim_x] == b'H':\n", " labels[i] = 'H'\n", " if env.unwrapped.desc[i // dim_x, i % dim_x] == b'G':\n", " labels[i] = 'G'\n", " labels = labels.reshape(dim_y, dim_x)\n", " im = sns.heatmap(value_max, cmap=\"RdYlGn\", annot=labels, annot_kws={'fontsize': 16}, fmt='s')\n", "\n", " if color_terminal_states:\n", " for i in range(dim_x * dim_y):\n", " if env.unwrapped.desc[i // dim_x, i % dim_x] == b'H':\n", " ax.add_patch(plt.Rectangle((i % dim_x, i // dim_x), 1, 1, color='black'))\n", " if env.unwrapped.desc[i // dim_x, i % dim_x] == b'G':\n", " ax.add_patch(plt.Rectangle((i % dim_x, i // dim_x), 1, 1, color='purple'))\n", "\n", " ax.set(title='Maximum Value per State')\n", " ax.set_xticks(np.linspace(0.5, dim_x-0.5, num=dim_x))\n", " ax.set_xticklabels([\"%d\" % x for x in np.arange(dim_x)])\n", " ax.set_yticks(np.linspace(0.5, dim_y-0.5, num=dim_y))\n", " ax.set_yticklabels([\"%d\" % y for y in np.arange(dim_y)], rotation='horizontal')\n", " if show:\n", " plt.show()\n", " return im\n", "\n", "\n", "def get_color_for_value(value, vmin, vmax):\n", " # Normalize the value between 0 and 1\n", " norm_value = (value - vmin) / (vmax - vmin)\n", " # Get color from a colormap\n", " colormap = plt.cm.RdYlGn # You can choose any other colormap\n", " return colormap(norm_value)\n", "\n", "\n", "def plot_q_values_grid(q_values, env, square_size=1, color_terminal_states=True, ax=None, show=False):\n", " grid = env.unwrapped.desc\n", " rows, cols = grid.shape\n", " q_values = q_values.reshape(rows, cols, 4)\n", "\n", " if ax is None:\n", " fig, ax = plt.subplots()\n", "\n", " # Determine the range of Q-values for normalization\n", " vmin = np.min(q_values)\n", " vmax = np.max(q_values)\n", "\n", " # Actions correspond to directions: 0=left, 1=right, 2=down, 3=up\n", " actions = {0: 'left', 2: 'right', 3: 'down', 1: 'up'}\n", "\n", " # Loop through each position in the grid\n", " for i in range(rows):\n", " for j in range(cols):\n", " # Calculate the lower-left corner of the square\n", " x = j * square_size\n", " y = i * square_size\n", "\n", " # Define the corners of the square\n", " bl = (x, y) # bottom-left\n", " br = (x + square_size, y) # bottom-right\n", " tl = (x, y + square_size) # top-left\n", " tr = (x + square_size, y + square_size) # top-right\n", "\n", " # Get Q-values for current state (i, j)\n", " q_left = q_values[i, j, 0]\n", " q_up = q_values[i, j, 1]\n", " q_right = q_values[i, j, 2]\n", " q_down = q_values[i, j, 3]\n", "\n", " # Check if the current cell is the one to be colored black\n", " if color_terminal_states and grid[i, j] == b'H':\n", " edge_color = None\n", " left_color, right_color, down_color, up_color = ['black'] * 4\n", " elif color_terminal_states and grid[i, j] == b'G':\n", " edge_color = None\n", " left_color, right_color, down_color, up_color = ['purple'] * 4\n", " else:\n", " edge_color = 'black'\n", " left_color = get_color_for_value(q_left, vmin, vmax)\n", " right_color = get_color_for_value(q_right, vmin, vmax)\n", " down_color = get_color_for_value(q_down, vmin, vmax)\n", " up_color = get_color_for_value(q_up, vmin, vmax)\n", "\n", " # Draw and color the triangles based on Q-values\n", " triangle_left = patches.Polygon([bl, tl, (x + square_size/2, y + square_size/2)], closed=True,\n", " edgecolor=edge_color, facecolor=left_color)\n", " triangle_right = patches.Polygon([br, tr, (x + square_size/2, y + square_size/2)], closed=True,\n", " edgecolor=edge_color, facecolor=right_color)\n", " triangle_down = patches.Polygon([bl, br, (x + square_size/2, y + square_size/2)], closed=True,\n", " edgecolor=edge_color, facecolor=down_color)\n", " triangle_up = patches.Polygon([tl, tr, (x + square_size/2, y + square_size/2)], closed=True,\n", " edgecolor=edge_color, facecolor=up_color)\n", "\n", " ax.add_patch(triangle_left)\n", " ax.add_patch(triangle_right)\n", " ax.add_patch(triangle_down)\n", " ax.add_patch(triangle_up)\n", "\n", " # Set the title\n", " ax.set(title='Action-State Values')\n", "\n", " # Set the limits of the plot\n", " ax.set_xlim(0, cols * square_size)\n", " ax.set_ylim(0, rows * square_size)\n", "\n", " # Set aspect of the plot to be equal\n", " ax.set_aspect('equal')\n", "\n", " # Disable ticks on both axes\n", " ax.set_xticks([]) # Disable x ticks\n", " ax.set_yticks([]) # Disable y ticks\n", "\n", " ax.invert_yaxis() # Optional: Invert y-axis to have (0,0) at the top-left corner\n", " plt.colorbar(plt.cm.ScalarMappable(norm=plt.Normalize(vmin=vmin, vmax=vmax), cmap='RdYlGn'), ax=ax, label='Q-value')\n", " if show:\n", " plt.show()\n", "\n", "\n", "def plot_performance(env, value, reward_sums):\n", " # Create a figure\n", " fig = plt.figure(figsize=(16, 12), dpi=200)\n", "\n", " # Define a GridSpec with 2 rows and 2 columns\n", " gs = GridSpec(2, 2, figure=fig)\n", "\n", " # Plot in the first row spanning both columns\n", " ax1 = fig.add_subplot(gs[0, :])\n", " plot_rewards(reward_sums, ax=ax1)\n", "\n", " # Plot in the second row\n", " ax2 = fig.add_subplot(gs[1, 0])\n", " plot_q_values_grid(value, env, ax=ax2)\n", "\n", " ax3 = fig.add_subplot(gs[1, 1])\n", " plot_heatmap(env, value, ax=ax3)\n", "\n", " # Show the plots\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": { "id": "qTS3Lm1nWiMK" }, "source": [ "# Explore the Environment (5 points)" ] }, { "cell_type": "markdown", "metadata": { "id": "yyCz0lxFWsFv" }, "source": [ "First we must create the environment.\n", "In order to be able to render the environment we use `render_mode=\"rgb_array\"` as the render mode.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xZI6WO4bmgjS" }, "outputs": [], "source": [ "slippery = True\n", "env = gym.make(\"FrozenLake-v1\", render_mode=\"rgb_array\", is_slippery=slippery)\n", "\n", "# TODO: Print the observation space and action space\n", "print('Observations:', ...)\n", "print('Actions:', ...)" ] }, { "cell_type": "markdown", "metadata": { "id": "-8SUlhUHWvbC" }, "source": [ "Define a random policy that randomly selects an action from the action space." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "mVMkkYiRmghQ" }, "outputs": [], "source": [ "def random_policy(*args):\n", " # TODO: Select a random action\n", " action = ...\n", " return action" ] }, { "cell_type": "markdown", "metadata": { "id": "Qj7aYuG-W1Rz" }, "source": [ "Visualize the random policy." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Ef47COnKmgfJ" }, "outputs": [], "source": [ "create_policy_eval_video(env, random_policy, 'random_policy', num_episodes=5)" ] }, { "cell_type": "markdown", "metadata": { "id": "ErwP00DkW_L6" }, "source": [ "# Policies (5 points)" ] }, { "cell_type": "markdown", "metadata": { "id": "RXvYKQBlXAet" }, "source": [ "First define a greedy policy that takes a state $s$ and returns the action $a$ with the highest $Q(s,a)$." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gK_kPk4qmgXR" }, "outputs": [], "source": [ "def greedy_policy(state: int, q_values: np.ndarray) -> int:\n", " # TODO: Select a greedy action\n", " action = ...\n", " return action" ] }, { "cell_type": "markdown", "metadata": { "id": "L3YviOabXQrC" }, "source": [ "Now define a policy that selects a greedy action with probability $1-\\varepsilon$ and selects a random action with probability $\\varepsilon$, for given a constant $\\varepsilon$." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CMjGEHGbmgAK" }, "outputs": [], "source": [ "def epsilon_greedy_policy(state: int, q_values: np.ndarray, epsilon: float) -> int:\n", " # TODO: Select an epsilon-greedy action\n", " action = ...\n", " return action" ] }, { "cell_type": "markdown", "metadata": { "id": "Z3JHKr8lWFh0" }, "source": [ "# Dyna-Q (25 points)" ] }, { "cell_type": "markdown", "metadata": { "id": "eWEMLsQizR1-" }, "source": [ "**Dyna-Q** combines model-free and model-based reinforcement learning by integrating direct learning from experience with planning from a learned model.\n", "Unlike **Q-learning**, which only updates the action values based on real experiences, **Dyna-Q** uses a learned model of the environment to simulate additional experiences and update the action values, accelerating the learning process.\n", "\n", "The environment we are working with is *deterministic* when `is_slippery=False`.\n", "For this reason we use also use a *deterministic* model for the environment.\n", "In order to see how **Dyna-Q** gets implemented for *stochastic* environmnets, check out the workshop!" ] }, { "cell_type": "markdown", "metadata": { "id": "3Z0SubSLe6in" }, "source": [ "## Planning (10 points)\n", "\n", "In addition to this update from real experiences, **Dyna-Q** updates the action values using simulated experiences:\n", "\n", "\n", "$$Q(s, a) \\leftarrow Q(s, a) + \\alpha \\left(\\hat{r} + \\gamma \\max_{a'} Q(\\hat{s}, a') - Q(s, a)\\right)$$\n", "\n", "\n", "Complete the `q_planning` function to perform `n` steps of planning." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "I7cZXjeDe8-K" }, "outputs": [], "source": [ "def q_planning(model: dict, q: np.ndarray, alpha: float, gamma: float, n: int) -> np.ndarray:\n", "\n", " # TODO: Perform n steps of planning\n", "\n", " # TODO: Randomly sample a known state-action pair\n", "\n", " # TODO: Get the predicted reward and next_state from the model\n", "\n", " # TODO: Update Q-value using the deterministic transition\n", "\n", " return q" ] }, { "cell_type": "markdown", "metadata": { "id": "u_xKveK4e9o2" }, "source": [ "## Learning (15 points)\n", "\n", "For `n_episodes`, take ɛ-greedy actions based on the Q-values and at each step update them:\n", "\n", "$$Q(s_t, a_t) \\leftarrow Q(s_t, a_t) + \\alpha \\left(r_t + \\gamma \\max_{a} Q(s_{t+1}, a) - Q(s_t, a_t)\\right)$$\n", "\n", "After each step you must also update the *deterministic* model and call `q_planning` to perform the planning steps." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "FOS3Ahi91kWX" }, "outputs": [], "source": [ "def dyna_q(n_episodes: int, env: gym.Env, epsilon: float, alpha: float,\n", " gamma: float, n: int) -> tuple[np.ndarray, np.ndarray]:\n", " \"\"\"Dyna-Q algorithm for deterministic environments.\"\"\"\n", "\n", " reward_sums = np.zeros(n_episodes)\n", " q = np.zeros((env.observation_space.n, env.action_space.n))\n", "\n", " # Dyna-Q model for deterministic environments\n", " model = defaultdict(dict)\n", "\n", " for episode_i in (pbar := trange(n_episodes, leave=False)):\n", " state, info = env.reset()\n", " reward_sum, terminal = 0, False\n", "\n", " while not terminal:\n", " # TODO: Take ɛ-greedy action\n", "\n", " # TODO: Q-learning update\n", "\n", " # TODO: Update deterministic model\n", "\n", " # TODO: Planning step(s)\n", "\n", " # TODO: Move to next state\n", "\n", " # TODO: Update reward sum\n", "\n", " pbar.set_description(f'Episode Reward {int(reward_sum)}')\n", " reward_sums[episode_i] = reward_sum\n", "\n", " return q, reward_sums" ] }, { "cell_type": "markdown", "metadata": { "id": "PjWksLHDfVnw" }, "source": [ "# Experiments (15 points)" ] }, { "cell_type": "markdown", "metadata": { "id": "3JBCHFHmfkqf" }, "source": [ "Here you can run a bunch of experiments.\n", "You can use the given template for your experiments.\n", "\n", "If you have trouble getting started, work with a smaller map instead by setting `map_name=\"4x4\"` but keep in mind the ultimate goal of this assignment is solving the larger `8x8` grid.\n", "Using a simpler environment could help you better understand the limitations and motivate you to come up with solutions.\n", "\n", "After playing with the hyperparameters you should answer the following questions:\n", "\n", "* How does increasing the number of planning steps affect the overall learning process?\n", "* What would happen if we set `is_slippery=True` assuming we **didn't** change the *deterministic* nature of our algorithm?\n", "* Does planning even help for this specific environment? How so? (Hint: analyze the reward signal)\n", "* Assuming it takes $N_1$ episodes to reach the goal for the first time, and from then it takes $N_2$ episodes to reach the goal for the second time, explain how the number of planning steps `n` affects $N_1$ and $N_2$.\n", "\n", "`Your Answers:`\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "tIr6-vt52F6s" }, "outputs": [], "source": [ "# set for reproducibility\n", "np.random.seed(2025)\n", "\n", "# parameters needed by our policy and learning rule\n", "params = {'epsilon': ..., # epsilon-greedy policy\n", " 'alpha': ..., # learning rate\n", " 'gamma': ..., # temporal discount factor\n", " 'n': ..., # number of planning steps\n", "}\n", "\n", "# episodes/trials\n", "n_episodes = ...\n", "\n", "# environment initialization\n", "env = gym.make(\"FrozenLake-v1\", map_name=\"8x8\", is_slippery=False)\n", "\n", "# Solve Frozen Lake using Dyna-Q\n", "value_dyna_q, reward_sums_dyna_q = dyna_q(n_episodes, env, **params)\n", "\n", "# Plot the results\n", "plot_performance(env, value_dyna_q, reward_sums_dyna_q)" ] }, { "cell_type": "markdown", "metadata": { "id": "GJHiYFIiX6OJ" }, "source": [ "# Are you having troubles? (15 points)\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "ogz5sjwsd_gp" }, "source": [ "Here are a number of options you can explore to see better results:\n", "\n", "1. Introduce a baseline for Q-values to speed up convergence during initialization.\n", "2. Change the value of $\\varepsilon$ as a function of episode progression (or something else) for better exploration strategies.\n", "3. Replace the $\\varepsilon$-greedy policy with a different policy that is better suited for this environment.\n", "4. Change the number of planning steps as a function of episode progression (or something else) for more efficient planning.\n", "\n", "In the last two sections of this assignment, we examine how **Reward Shaping** and **Prioritized Sweeping** can yield better results.\n", "Other than these two methods, you are more than welcome to introduce any other method to improve your results.\n", "\n", "You can choose any or none of these methods at your discretion.\n", "The only important detail is to be able to solve the environments.\n", "You **MUST** justify each method and how it helps with issues you were facing before.\n", "\n", "`Your Answer:`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vOfcvIwcclB0" }, "outputs": [], "source": [ "# TODO: Don't overwrite your previous experiments!\n", "'''\n", "Please leave a new code cell for each new experiment.\n", "In a markdown cell, explain the improvements seen and justify them.\n", "'''" ] }, { "cell_type": "markdown", "metadata": { "id": "79zFjc5_GD7M" }, "source": [ "# Reward Shaping (10 points)" ] }, { "cell_type": "markdown", "metadata": { "id": "RLhIU08-GGUD" }, "source": [ "As you know, [Reward Function Design is Difficult](https://www.alexirpan.com/2018/02/14/rl-hard.html#reward-function-design-is-difficult).\n", "However there a few main principles than can help speed up *Temporal-Difference Learning*.\n", "Here we ask you to modify the reward function to achieve better results." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CQbzP7W4GPjV" }, "outputs": [], "source": [ "from gymnasium.envs.toy_text import FrozenLakeEnv\n", "\n", "class CustomFrozenLakeEnv(FrozenLakeEnv):\n", " def step(self, action):\n", " obs, reward, terminated, truncated, info = super().step(action)\n", " # Modify the reward calculation\n", " reward = self.custom_reward_function(obs, reward, terminated)\n", " return obs, reward, terminated, truncated, info\n", "\n", " def custom_reward_function(self, observation, reward, done):\n", " # TODO: Define your custom reward logic here\n", " custom_reward = ...\n", " return custom_reward" ] }, { "cell_type": "markdown", "metadata": { "id": "yJYC7m61o5GO" }, "source": [ "Experiment with the new environment and improved reward function.\n", "\n", "Explanin how your modifications improved the results.\n", "\n", "`Your Answer:`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "r86WNS7kI4iQ" }, "outputs": [], "source": [ "# set for reproducibility\n", "np.random.seed(2025)\n", "\n", "# parameters needed by our policy and learning rule\n", "params = {'epsilon': ..., # epsilon-greedy policy\n", " 'alpha': ..., # learning rate\n", " 'gamma': ..., # temporal discount factor\n", " 'n': ..., # number of planning steps\n", "}\n", "\n", "# episodes/trials\n", "n_episodes = ...\n", "\n", "# environment initialization\n", "env = CustomFrozenLakeEnv(map_name=\"8x8\", is_slippery=False)\n", "\n", "# Solve Frozen Lake using Dyna-Q\n", "value_dyna_q, reward_sums_dyna_q = dyna_q(n_episodes, env, **params)\n", "\n", "# Plot the results\n", "plot_performance(env, value_dyna_q, reward_sums_dyna_q)" ] }, { "cell_type": "markdown", "metadata": { "id": "BC8vDTUPWVn9" }, "source": [ "# Prioritized Sweeping (20 points)" ] }, { "cell_type": "markdown", "metadata": { "id": "gdLbPB7bpY3A" }, "source": [ "In the Dyna agents presented in the preceding sections, simulated transitions are started in state-action pairs selected uniformly at random from all previously experienced pairs.\n", "But a uniform selection is usually not the best; planning can be much more efficient if simulated transitions and updates are focused on particular state-action pairs." ] }, { "cell_type": "markdown", "metadata": { "id": "KezMBcc6ulwl" }, "source": [ "## Priority Planning (10 points)\n", "\n", "The `q_planning_priority` function performs planning updates using **Prioritized Sweeping** by first checking if the priority queue is empty and then iterating for a specified number of planning steps.\n", "It pops the state-action pair with the highest priority from the queue, retrieves the deterministic transition, and updates the Q-value using the temporal-difference (TD) error.\n", "Next, it updates priorities for predecessor state-action pairs that transition to the current state, calculating their TD errors and pushing significant ones back into the priority queue." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "frRCmViEfdT2" }, "outputs": [], "source": [ "def q_planning_priority(model: dict, q: np.ndarray, priorities: list, alpha: float, gamma: float, n: int) -> np.ndarray:\n", " \"\"\"Performs planning updates using Prioritized Sweeping.\"\"\"\n", "\n", " # TODO: Perform n steps of planning\n", "\n", " # TODO: Sample the state-action pair with the highest priority\n", "\n", " # TODO: Retrieve deterministic transition\n", "\n", " # TODO: Update Q-value using the deterministic transition\n", "\n", " # TODO: Update priorities for predecessors\n", "\n", " return q" ] }, { "cell_type": "markdown", "metadata": { "id": "DkJ_4awivieI" }, "source": [ "## Learning with Prioritized Sweeping (10 points)\n", "\n", "Similar to the original `dyna_q` function, you need to implement the `dyna_q_priority`.\n", "This time you must also store priorities and use a threshold parameter $\\theta$ that determines whether a state-action pair should be added to the priority queue based on the magnitude of the temporal-difference (TD) error.\n", "\n", "Specifically, $\\theta$ is used to filter out small TD errors that are considered insignificant.\n", "When the absolute value of the TD error for a state-action pair is greater than $\\theta$, the pair is added to the priority queue.\n", "This ensures that only state-action pairs with significant TD errors are prioritized for planning updates." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ac-fusFoWWsl" }, "outputs": [], "source": [ "def dyna_q_priority(n_episodes: int, env: gym.Env, epsilon: float, alpha: float,\n", " gamma: float, n: int, theta: float) -> tuple[np.ndarray, np.ndarray]:\n", " \"\"\"Dyna-Q with Prioritized Sweeping algorithm for deterministic environments.\"\"\"\n", "\n", " reward_sums = np.zeros(n_episodes)\n", " q = np.zeros((env.observation_space.n, env.action_space.n))\n", "\n", " # Dyna-Q model for deterministic environments\n", " model = defaultdict(dict)\n", "\n", " # Priority queue for prioritized sweeping\n", " priorities = []\n", "\n", " for episode_i in (pbar := trange(n_episodes, leave=False)):\n", " state, info = env.reset()\n", " reward_sum, terminal = 0, False\n", "\n", " while not terminal:\n", " # TODO: Take ɛ-greedy action\n", "\n", " # TODO: Q-learning update\n", "\n", " # TODO: Update deterministic model\n", "\n", " # TODO: Update priority queue if the TD error is significant\n", "\n", " # TODO: Planning step with prioritized sweeping\n", "\n", " # TODO: Move to next state\n", "\n", " # TODO: Update reward sum\n", "\n", " pbar.set_description(f'Episode Reward {int(reward_sum)}')\n", " reward_sums[episode_i] = reward_sum\n", "\n", " return q, reward_sums" ] }, { "cell_type": "markdown", "metadata": { "id": "5WPptP1ywsey" }, "source": [ "# Final Experiments (5 points)" ] }, { "cell_type": "markdown", "metadata": { "id": "Vu7UstPaxF5A" }, "source": [ "We're almost done!\n", "All that remains is for you to test **Prioritized Sweeping**." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "P4khh0loW0wR" }, "outputs": [], "source": [ "# set for reproducibility\n", "np.random.seed(2025)\n", "\n", "# parameters needed by our policy and learning rule\n", "params = {'epsilon': ..., # epsilon-greedy policy\n", " 'alpha': ..., # learning rate\n", " 'gamma': ..., # temporal discount factor\n", " 'n': ..., # number of planning steps\n", " 'theta': ... # prioritization threshold\n", "}\n", "\n", "# episodes/trials\n", "n_episodes = ...\n", "\n", "# environment initialization\n", "env = CustomFrozenLakeEnv(map_name=\"8x8\", is_slippery=False)\n", "\n", "# Solve Frozen Lake using Dyna-Q with Prioritized Sweeping\n", "value_dyna_q, reward_sums_dyna_q = dyna_q_priority(n_episodes, env, **params)\n", "\n", "# Plot the results\n", "plot_performance(env, value_dyna_q, reward_sums_dyna_q)" ] }, { "cell_type": "markdown", "metadata": { "id": "SK0au1A3zOWl" }, "source": [ "Visualize the learned policy." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "DaG18CS5yi5d" }, "outputs": [], "source": [ "env = CustomFrozenLakeEnv(map_name=\"8x8\", render_mode=\"rgb_array\", is_slippery=False)\n", "create_policy_eval_video(env, greedy_policy, 'DynaQ', Q=value_dyna_q)" ] }, { "cell_type": "markdown", "metadata": { "id": "F-yqb2hCzRzT" }, "source": [ "Explain how these last improvements helped.\n", "\n", "`Your Answer:`" ] }, { "cell_type": "markdown", "metadata": { "id": "SG_sYybNDDRB" }, "source": [ "# Extra (10 points)" ] }, { "cell_type": "markdown", "metadata": { "id": "DUfuIWaVCby3" }, "source": [ "If this assignment was too easy for you, feel free to implement *Prioritized Sweeping* for stochastic environments and try to learn the `8x8` Frozen Lake with `is_slippery=True`.\n", "Use everything you've learned to achieve better results!" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "qXbMNTrREK82" }, "outputs": [], "source": [ "# @title 🧊 Happy Coding 🤖\n" ] } ], "metadata": { "colab": { "collapsed_sections": [ "UxK08xN_wco-", "qTS3Lm1nWiMK", "ErwP00DkW_L6", "GJHiYFIiX6OJ" ], "provenance": [], "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }