{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "

HW5: Monte Carlo Tree Search and Naive Search

\n", "\n", "> **Full Name:** `[Your Full Name]` \n", "> **Student ID:** `[Your Student ID]` \n", "\n", "

\n", " \n", " \"Open\n", " \n", " \n", " \"Open\n", " \n", "

\n", "\n", "---\n", "\n", "## 📌 Overview\n", "\n", "Welcome to **HW5**, where you will explore and implement search algorithms in reinforcement learning! \n", "In this assignment, you will:\n", "\n", "✅ Implement **Monte Carlo Tree Search (MCTS)** and a **Naive Search Algorithm**. \n", "✅ Develop an agent that utilizes these techniques for decision-making. \n", "✅ Test and evaluate the performance of your agent. \n", "\n", "### 📂 Notebook Structure:\n", "1️⃣ **Environment Definition** \n", "2️⃣ **Search Algorithms: Naive Search & MCTS** \n", "3️⃣ **Agent Implementation** \n", "4️⃣ **Training & Testing Loop** \n", "\n", "💡 *Each section includes explanations, docstrings, and `TODO` placeholders for missing implementations. Fill in the gaps and complete the assignment!* \n", "\n", "🚀 *Let's get started!* \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Detailed Explanation of the Overall Algorithm\n", "\n", "This notebook implements a MuZero-like approach to reinforcement learning, as described in the paper \n", "[“Mastering Atari, Go, Chess and Shogi by Planning with a Learned Model” (Schrittwieser et al., 2019)](https://arxiv.org/abs/1911.08265). \n", "Below is an overview of the key components and how they work together:\n", "\n", "---\n", "\n", "## 1. Representation, Dynamics, and Prediction Networks\n", "\n", "MuZero (and algorithms inspired by it) relies on three core neural networks:\n", "\n", "1. **Representation Network (`RepresentationNet`)** \n", " - **Goal**: Convert raw observations (e.g., game frames, board states) into a latent (hidden) state. \n", " - **Why It Matters**: By encoding high-dimensional inputs (like pixel images) into a more compact form, downstream modules can operate efficiently on this latent space rather than the raw data.\n", "\n", "2. **Dynamics Network (`DynamicsNet`)** \n", " - **Goal**: Given the current hidden state and an action, predict the **next hidden state** and an **immediate reward estimate**. \n", " - **Why It Matters**: This is the “model” portion of MuZero. It simulates how the hidden state changes when an action is taken, letting the algorithm plan forward without needing a hand-crafted or perfect simulator.\n", "\n", "3. **Prediction Network (`PredictionNet`)** \n", " - **Goal**: From a hidden state, predict:\n", " 1. **A policy distribution** (probabilities of selecting each possible action)\n", " 2. **A value estimate** (scalar measure of how good or bad the state is) \n", " - **Why It Matters**: This guides the search and learning, telling us which actions are promising (policy) and how favorable the current position might be (value).\n", "\n", "---\n", "\n", "## 2. Monte Carlo Tree Search (MCTS) and Naive Depth Search\n", "\n", "1. **Monte Carlo Tree Search (`MCTS` Class)** \n", " - **Core Idea**: Repeatedly simulate many “what if?” scenarios (rollouts) to gather statistics on which actions lead to higher value. \n", " - **Key Methods**: \n", " - `run(...)`: Manages the entire search process from the root hidden state. \n", " - `_expand_node(...)`: Expands a leaf node by calling the dynamics and prediction networks. \n", " - `_backpropagate(...)`: Propagates the newly obtained value estimates back up the search path. \n", " - `_calc_ucb(...)`: Calculates an Upper Confidence Bound (UCB) score to balance exploration (trying new actions) and exploitation (using known good actions). \n", " - `_compute_pi()`: Aggregates the visit counts of each child action at the root to form a final policy distribution. \n", "\n", "2. **Naive Depth Search (`naive_depth_search(...)`)** \n", " - **Core Idea**: Look ahead a fixed depth in a brute-force manner, expanding all action sequences up to that depth. \n", " - **Why It Matters**: Though less sophisticated than MCTS, it offers a simpler example of planning. It enumerates action branches, accumulates rewards and discounted values, and then picks the best sequence.\n", "\n", "---\n", "\n", "## 3. Buffer Replay\n", "\n", "- **Purpose**: Store entire trajectories (episodes) of experience, then allow sampling of smaller segments (sub-trajectories) for training. \n", "- **How**: \n", " 1. **`add_trajectories(...)`**: Inserts full episodes into the buffer. \n", " 2. **`sample_sub_trajectory(...)`**: Retrieves a sub-trajectory of length `k` (for an n-step return) from the stored episodes. \n", " 3. **`sample_batch(...)`**: Returns a batch of sub-trajectories for training the neural networks. \n", "\n", "---\n", "\n", "## 4. Agent\n", "\n", "- **Role**: Central coordinator that uses the networks (Representation, Dynamics, Prediction) and search algorithms (MCTS or naive) to choose actions. \n", "- **Key Functions**:\n", " 1. **`inference(...)`**: Given an observation, the agent converts it to a hidden state, then either:\n", " - Runs MCTS to derive a policy (if `search_type=\"mcts\"`)\n", " - Performs a naive depth search (if `search_type=\"naive\"`)\n", " - Or uses the direct policy from the `PredictionNet` (if no search is enabled) \n", " The chosen action, along with the policy distribution and value estimate, is returned.\n", " 2. **`initial_step(...)`**: Processes the initial raw observation into a hidden state and then obtains policy/value estimates. \n", " 3. **`rollout_step(...)`**: Given a hidden state plus actions, calls the `DynamicsNet` to get the next hidden state and reward, and the `PredictionNet` to predict policy/value at that new state.\n", "\n", "---\n", "\n", "## 5. Overall Training Procedure\n", "\n", "1. **Collect Trajectories**: \n", " - The agent interacts with the environment, using either MCTS or naive search to pick actions. \n", " - The environment returns observations/rewards that form trajectories (episodes).\n", "\n", "2. **Store in Replay**: \n", " - These full trajectories are added to the `BufferReplay`.\n", "\n", "3. **Sample Mini-Batches**: \n", " - The training script samples sub-trajectories from the buffer (via `sample_sub_trajectory(...)` or `sample_batch(...)`).\n", "\n", "4. **Unroll the Model**: \n", " - For each sampled sub-trajectory, the code unrolls the networks (`RepresentationNet`, `DynamicsNet`, `PredictionNet`) for multiple steps (up to `k`) to compare the model’s predictions to the actual transitions.\n", "\n", "5. **Compute Loss & Update**: \n", " - The total loss typically includes terms for:\n", " - **Value error** (predicted vs. actual return)\n", " - **Policy error** (predicted policy vs. search-derived/improved policy)\n", " - **Reward error** (predicted vs. actual immediate reward) \n", " - Backpropagation updates the parameters of all three networks end-to-end.\n", "\n", "6. **Repeat**: \n", " - Continue this cycle of data collection and network updates until convergence." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Library Imports" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0OFWJji-GcUt" }, "outputs": [], "source": [ "import numpy as np\n", "import gym\n", "import torch\n", "import torch.nn as nn\n", "import torch.optim as optim\n", "import torch.nn.functional as F\n", "import time\n", "import math\n", "\n", "import matplotlib.pyplot as plt\n", "from IPython.display import clear_output" ] }, { "cell_type": "code", "execution_count": 65, "metadata": { "id": "LeifZqBYOXbx" }, "outputs": [], "source": [ "device = torch.device(\"cuda\") if torch.cuda.is_available() else torch.device(\"cpu\")\n", "dtype = torch.float" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PcGuCbIzGg7w" }, "outputs": [], "source": [ "##############################################\n", "# Environment Runner\n", "##############################################\n", "class EnvironmentRunner:\n", " \"\"\"\n", " Manages interactions with an OpenAI Gym environment:\n", " - Resets the environment\n", " - Steps through it given an agent's actions\n", " - Logs returns and gathers trajectory data\n", " \"\"\"\n", "\n", " def __init__(self, gym_env):\n", " self.gym_env = gym_env\n", " self.num_actions = self.gym_env.action_space.n\n", "\n", " self.observation = self.gym_env.reset()\n", " self.episodes_count = 0\n", "\n", "\n", " def run(self, agent, show_render=False):\n", " \"\"\"\n", " Runs one full episode:\n", " 1) Resets environment\n", " 2) Steps until done\n", " 3) Logs returns\n", " 4) Returns the collected trajectory\n", " \"\"\"\n", " obs_list = []\n", " actions_list = []\n", " rewards_list = []\n", " done_list = []\n", " policy_list = []\n", " values_list = []\n", "\n", " # initial reset\n", " self.observation = self.gym_env.reset()\n", " obs_list.append(torch.tensor(self.observation))\n", "\n", " finished = False\n", " while not finished:\n", " # Let agent infer the next action\n", " chosen_action, distribution, val_est = agent.inference(\n", " torch.tensor(self.observation, dtype=dtype, device=device)\n", " )\n", "\n", " next_obs, reward, finished, info = self.gym_env.step(chosen_action)\n", "\n", " # Store\n", " obs_list.append(torch.tensor(next_obs))\n", " actions_list.append(chosen_action)\n", " policy_list.append(torch.tensor(distribution))\n", " values_list.append(val_est)\n", " rewards_list.append(torch.tensor(reward))\n", " done_list.append(finished)\n", "\n", " # Optional render\n", " if show_render:\n", " self.gym_env.render()\n", " time.sleep(0.024)\n", "\n", " self.observation = next_obs\n", "\n", " self.episodes_count += 1\n", " return self.build_trajectory(obs_list, actions_list, rewards_list, done_list, policy_list, values_list)\n", "\n", " @staticmethod\n", " def build_trajectory(obs, acts, rews, finished_flags, pols, vals):\n", " return {\n", " \"obs\": obs,\n", " \"actions\": acts,\n", " \"rewards\": rews,\n", " \"dones\": finished_flags,\n", " \"pis\": pols,\n", " \"vs\": vals,\n", " \"length\": len(obs)\n", " }\n", " \n", "class LoggingEnvRunner(EnvironmentRunner):\n", " \"\"\"\n", " Extends EnvironmentRunner to log episode returns.\n", " \"\"\"\n", " def __init__(self, env):\n", " super().__init__(env)\n", " self.episode_returns = []\n", "\n", " def run(self, agent):\n", " trajectory = super().run(agent) # normal run\n", " # Summation of the episode's rewards:\n", " ep_return = float(np.sum(trajectory[\"rewards\"]))\n", " self.episode_returns.append(ep_return)\n", " return trajectory" ] }, { "cell_type": "code", "execution_count": 67, "metadata": { "id": "rJBube37GhXH" }, "outputs": [], "source": [ "##############################################\n", "# A simple Gym Wrapper\n", "##############################################\n", "class GymWrapper(gym.Wrapper):\n", " \"\"\"\n", " A wrapper that folds multiple historical observations\n", " into a single stacked observation.\n", " \"\"\"\n", "\n", " def __init__(self, env, history_len):\n", " super().__init__(env)\n", " self.history_len = history_len\n", " self.obs_dim = env.observation_space.shape[0]\n", " self.num_actions = env.action_space.n\n", "\n", " def reset(self):\n", " self.episode_return = 0\n", " self.observations_rollout = []\n", "\n", " obs = self.env.reset()\n", " self.observations_rollout.append(obs)\n", "\n", " return self.aggregate()\n", "\n", " def aggregate(self):\n", " \"\"\"\n", " Stacks the last N observations (with possible zero-padding).\n", " Returns them as a flattened array of shape (1, -1).\n", " \"\"\"\n", " stacked = np.zeros((self.history_len, self.obs_dim))\n", "\n", " current_length = len(self.observations_rollout)\n", " if current_length == self.history_len:\n", " stacked = np.array(self.observations_rollout)\n", " else:\n", " stacked[self.history_len - current_length :] = np.array(\n", " self.observations_rollout\n", " )\n", "\n", " return stacked.flatten().reshape(1, -1)\n", "\n", " def step(self, action):\n", " next_obs, reward, done, info = self.env.step(action)\n", " self.update_history(next_obs)\n", " aggregated_obs = self.aggregate()\n", "\n", " self.episode_return += reward\n", " if done:\n", " info[\"return\"] = self.episode_return\n", "\n", " return aggregated_obs, reward, done, info\n", "\n", " def update_history(self, new_obs):\n", " if len(self.observations_rollout) == self.history_len:\n", " # drop oldest\n", " self.observations_rollout = self.observations_rollout[1:]\n", " self.observations_rollout.append(new_obs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# `BufferReplay` Class\n", "\n", "This class manages a replay buffer for reinforcement learning, specifically storing entire trajectories (episodes). It supports two main operations:\n", "\n", "1. **Storing Trajectories**: \n", " - `add_trajectories(new_trajectories)`: Appends new trajectories to the buffer. If the capacity is exceeded, it overwrites old trajectories in a circular manner.\n", "\n", "2. **Sampling Sub-Trajectories**: \n", " - `sample_sub_trajectory(k, n, discount)`: Randomly selects a single trajectory from the buffer, chooses a random start index within that trajectory, and extracts a sub-trajectory of length `k` for training. It also computes an `n`-step return for each step in the sub-trajectory, taking into account the discount factor. \n", " - `sample_batch(batch_size, k, n, discount)`: Repeats the above sub-trajectory sampling multiple times to form a training batch.\n", "\n", "## Data Structure Returned by `sample_sub_trajectory`\n", "\n", "`sample_sub_trajectory` returns a dictionary with the following keys:\n", "\n", "- **\"obs\"**: The observation at the starting point of the sub-trajectory. \n", " - Type/Shape: Typically a tensor (or array) representing the state. \n", "- **\"pi\"**: A list of policy distributions (one per timestep in the sub-trajectory). \n", " - Type/Shape: Each entry is usually a tensor of shape `(num_actions,)`, giving the probability distribution over actions. \n", "- **\"v\"**: (If included in your design) a list of value estimates for each step in the sub-trajectory. \n", " - Type/Shape: Each entry could be a tensor of shape `(1,)` or just a single scalar representing the value. \n", "- **\"actions\"**: A list of integers (or tensors) indicating which action was taken at each step. \n", " - Type/Shape: Each entry typically an integer in the range `[0, num_actions-1]`. \n", "- **\"rewards\"**: A list of rewards obtained for each step in the sub-trajectory. \n", " - Type/Shape: Each entry is typically a scalar tensor (or float). \n", "- **\"return\"**: A list of n-step returns corresponding to each timestep in the sub-trajectory. \n", " - Type/Shape: Each entry is usually a scalar tensor (or float) representing the discounted sum of rewards plus a discounted bootstrap value.\n", "\n", "Overall, this dictionary encapsulates all the necessary data (observation, actions, rewards, policy, value, returns) for training, allowing algorithms to perform n-step updates or other forms of training that depend on short sequences of experience.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "##############################################\n", "# Buffer Replay (Experience)\n", "##############################################\n", "class BufferReplay:\n", " \"\"\"\n", " Stores entire episodes (trajectories), then allows sampling\n", " sub-portions for training (k-step unroll & n-step return).\n", " \"\"\"\n", "\n", " def __init__(self, capacity, num_actions):\n", " self.capacity = capacity\n", " self.memory = []\n", " self.position = 0\n", " self.num_actions = num_actions\n", "\n", " def add_trajectories(self, new_trajectories):\n", " \"\"\"\n", " Inserts new trajectories (episodes) into the memory buffer.\n", "\n", " Detailed Functionality:\n", " -----------------------\n", " 1. Iterates over each trajectory in `new_trajectories`.\n", " 2. If the buffer is not yet at `capacity`, append the new trajectory.\n", " 3. If at capacity, overwrite the oldest trajectory in a circular manner.\n", " 4. Updates the internal `position` pointer accordingly.\n", "\n", " Parameters:\n", " -----------\n", " new_trajectories : list\n", " A list of trajectory dictionaries, each containing keys like:\n", " - \"obs\": list of observations\n", " - \"actions\": list of actions\n", " - \"rewards\": list of rewards\n", " - \"pis\": list of policy distributions\n", " - \"vs\": list of value estimates (if present)\n", " - \"length\": an integer specifying the number of timesteps\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " pass\n", "\n", " def sample_sub_trajectory(self, k, n, discount):\n", " \"\"\"\n", " Randomly picks a single stored trajectory and a start index,\n", " returning k-step unroll data plus n-step returns.\n", "\n", " Detailed Functionality:\n", " -----------------------\n", " 1. Selects a random trajectory from the stored buffer.\n", " 2. Chooses a random start index within that trajectory.\n", " 3. Unrolls for `k` steps to gather:\n", " - Observations\n", " - Actions\n", " - Rewards\n", " - Policy distributions\n", " 4. Computes the n-step return for each of these k steps:\n", " - Sums the discounted rewards\n", " - May add a bootstrap value if within bounds of the trajectory\n", " 5. Handles out-of-bounds cases where the unroll extends beyond\n", " the trajectory length, using padding or default values where necessary.\n", "\n", " Parameters:\n", " -----------\n", " k : int\n", " The number of timesteps to unroll the environment from the chosen start index.\n", " n : int\n", " The n-step horizon for return calculation.\n", " discount : float\n", " The discount factor used for return computation.\n", "\n", " Returns:\n", " --------\n", " dict\n", " A dictionary with keys:\n", " - \"obs\": The initial observation at the start index.\n", " - \"pi\": List of policy distributions for each step (length `k+1`).\n", " - \"v\": (Optional, if your code includes it) List of value estimates per step.\n", " - \"actions\": List of actions taken (length `k+1`, including potential padding).\n", " - \"rewards\": List of rewards for each step in the unroll (length `k`).\n", " - \"return\": A list of computed n-step returns (length `k+1`).\n", " \"\"\"\n", " \n", " data = {\n", " \"obs\": None,\n", " \"pi\": [],\n", " \"v\": [],\n", " \"actions\": [],\n", " \"rewards\": [],\n", " \"return\": [],\n", " }\n", "\n", " # Choose a random trajectory\n", " mem_idx = np.random.choice(len(self.memory), 1)[0]\n", " chosen_length = self.memory[mem_idx][\"length\"]\n", " last_idx = chosen_length - 1\n", "\n", " # Random start\n", " start = np.random.choice(chosen_length, 1)[0]\n", "\n", " # We'll record the initial observation\n", " data[\"obs\"] = self.memory[mem_idx][\"obs\"][start]\n", "\n", " # Collect data for each unroll step\n", " for step in range(start, start + k + 1):\n", " lookahead = step + n\n", "\n", " # If looking beyond trajectory end, v_n = 0\n", " if lookahead >= last_idx:\n", " future_value = torch.tensor([0.0], device=device, dtype=dtype)\n", " else:\n", " future_value = self.memory[mem_idx][\"vs\"][lookahead] * (discount**n)\n", "\n", " # sum of discounted rewards up to n or end\n", " total_val = future_value.clone()\n", " max_reward_idx = min(last_idx, lookahead)\n", " enumer_rewards = list(\n", " enumerate(self.memory[mem_idx][\"rewards\"][step:max_reward_idx])\n", " )\n", " for i, single_r in enumer_rewards:\n", " total_val += single_r * (discount**i)\n", "\n", " data[\"return\"].append(total_val)\n", "\n", " # Not storing reward for the very initial step in the unroll\n", " if step != start:\n", " if 0 < step <= last_idx:\n", " data[\"rewards\"].append(self.memory[mem_idx][\"rewards\"][step - 1])\n", " else:\n", " data[\"rewards\"].append(torch.tensor([0.0], device=device))\n", "\n", " # Pi distribution\n", " if 0 <= step < last_idx:\n", " data[\"pi\"].append(self.memory[mem_idx][\"pis\"][step])\n", " else:\n", " # In case we are beyond the real trajectory\n", " uniform_probs = np.ones(self.num_actions) / self.num_actions\n", " data[\"pi\"].append(torch.tensor(uniform_probs, dtype=dtype))\n", "\n", " # Build the real set of actions from the actual trajectory\n", " max_valid_step = min(last_idx - 1, start + k - 1)\n", " num_steps_valid = max_valid_step - start\n", " data[\"actions\"] = self.memory[mem_idx][\"actions\"][\n", " start : start + num_steps_valid + 1\n", " ]\n", "\n", " # Fill with random actions if we unroll beyond the stored trajectory\n", " fill_count = k - num_steps_valid + 1\n", " for _ in range(fill_count):\n", " rand_act = np.random.choice(self.num_actions, 1)[0]\n", " data[\"actions\"].append(rand_act)\n", "\n", " return data\n", "\n", " def sample_batch(self, batch_size, k, n, discount=0.99):\n", " \"\"\"\n", " Returns a batch (list) of sub trajectories. Each item\n", " in the batch has the keys: obs, pi, v, actions, rewards, return\n", "\n", " Parameters:\n", " -----------\n", " batch_size : int\n", " Number of sub-trajectories to sample.\n", " k : int\n", " Number of unroll steps in each sub-trajectory.\n", " n : int\n", " Horizon for n-step return.\n", " discount : float\n", " Discount factor.\n", "\n", " Returns:\n", " --------\n", " list\n", " A list of dictionaries, each containing sub-trajectory data\n", " (with the same keys as described in `sample_sub_trajectory`).\n", " \"\"\"\n", "\n", " batch_data = []\n", " for _ in range(batch_size):\n", " sample = self.sample_sub_trajectory(k, n, discount)\n", " batch_data.append(sample)\n", " return batch_data\n", "\n", " def __len__(self):\n", " \"\"\"\n", " Returns the current number of stored trajectories in memory.\n", " \"\"\"\n", " return len(self.memory)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Representation, Dynamics, and Prediction Models\n", "\n", "These three neural network classes (`RepresentationNet`, `DynamicsNet`, `PredictionNet`) work together to model different aspects of an environment in a reinforcement learning (RL) setting, often inspired by algorithms like MuZero or similar approaches.\n", "\n", "1. **`RepresentationNet`** \n", " - **Goal**: Transform a raw observation (from the environment) into a **hidden state** vector. \n", " - **Role in RL**: This hidden state often serves as a compact, learned representation of the environment’s information and is used by subsequent models (such as `DynamicsNet` and `PredictionNet`) to make predictions about future states, rewards, policies, and values.\n", "\n", "2. **`DynamicsNet`** \n", " - **Goal**: Predict the **next hidden state** and **immediate reward** from the **current hidden state** and an encoded action. \n", " - **Role in RL**: This allows a learning algorithm to “imagine” how the environment transitions forward without directly querying the real environment, enabling planning and lookahead.\n", "\n", "3. **`PredictionNet`** \n", " - **Goal**: Given a hidden state, output a **policy** (probability distribution over possible actions) and a **value estimate** (scalar). \n", " - **Role in RL**: \n", " - The **policy** guides action selection, telling us which actions might be best at each timestep. \n", " - The **value estimate** helps evaluate how good or bad a particular state is, assisting with learning.\n", "\n", "## Form of the Desired Outputs\n", "- **`RepresentationNet`**: \n", " Outputs a **hidden state** tensor of shape `(batch_size, hidden_dim)`.\n", "- **`DynamicsNet`**: \n", " Outputs two tensors: \n", " 1. **Next hidden state** of shape `(batch_size, hidden_dim)` \n", " 2. **Reward estimate** of shape `(batch_size,)`\n", "- **`PredictionNet`**: \n", " Outputs two tensors: \n", " 1. **Policy** of shape `(batch_size, num_actions)`, representing a probability distribution over actions. \n", " 2. **Value** of shape `(batch_size,)`, representing a scalar estimate of the state value.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dTamqwyJGhik" }, "outputs": [], "source": [ "##############################################\n", "# Representation, Dynamics, Prediction models\n", "##############################################\n", "class RepresentationNet(nn.Module):\n", " \"\"\"\n", " Maps an environment observation into a hidden state.\n", "\n", " In Detail:\n", " ----------\n", " This network transforms a raw input observation (e.g., states, images, or other features)\n", " into a latent or hidden state representation that captures the essential information.\n", "\n", " Form of Output:\n", " ---------------\n", " - A hidden state tensor of shape (batch_size, hidden_dim).\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", "\n", " def __init__(self, input_dim, hidden_dim):\n", " \"\"\"\n", " Initializes layers required to convert `input_dim` features\n", " into a `hidden_dim`-dimensional vector.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " super().__init__()\n", "\n", " def forward(self, x):\n", " \"\"\"\n", " Forward pass that takes a batch of observations `x` and\n", " returns the corresponding hidden state representation.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " pass\n", "\n", "\n", "class DynamicsNet(nn.Module):\n", " \"\"\"\n", " Predicts the next hidden state and immediate reward\n", " from the current hidden state and action encoding.\n", "\n", " In Detail:\n", " ----------\n", " Given a current hidden state and an action, this network outputs:\n", " - Next hidden state (shape: (batch_size, hidden_dim))\n", " - A scalar reward estimate (shape: (batch_size,))\n", "\n", " The action is typically encoded as a one-hot or scalar that the network\n", " incorporates as part of the input layer.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", "\n", " def __init__(self, hidden_dim, action_space):\n", " \"\"\"\n", " Initializes layers that combine `hidden_dim` features and\n", " an encoded action to produce the next hidden state and reward.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " super().__init__()\n", "\n", " def forward(self, x):\n", " \"\"\"\n", " Forward pass that takes a batch consisting of the current hidden state\n", " and the encoded action, returning the next hidden state and reward estimate.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " pass\n", "\n", "\n", "class PredictionNet(nn.Module):\n", " \"\"\"\n", " Given a hidden state, outputs:\n", " - A policy distribution (size = num_actions)\n", " - A scalar value estimate (size = 1 per batch item)\n", "\n", " In Detail:\n", " ----------\n", " The model reads the hidden state (shape: (batch_size, hidden_dim)) and produces:\n", " - `policy`: A probability distribution over actions (shape: (batch_size, num_actions)).\n", " - `value`: A scalar estimate for each hidden state (shape: (batch_size,)).\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", "\n", " def __init__(self, hidden_dim, num_actions):\n", " \"\"\"\n", " Initializes layers to map a hidden state into policy logits\n", " (of length `num_actions`) and a scalar value estimate.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " super().__init__()\n", "\n", " def forward(self, hidden_x):\n", " \"\"\"\n", " Forward pass that takes a batch of hidden states `hidden_x`\n", " and produces the policy distribution and value estimate.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " pass\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# MCTS And Naive Depth-Based Search\n", "\n", "In this section, we have several classes and functions that support Monte Carlo Tree Search (MCTS) and a naive depth-based search approach:\n", "\n", "1. **`AdaptiveNormalizer`** \n", " - Tracks the running minimum and maximum values of inputs (e.g., Q-values). \n", " - Allows for on-the-fly normalization of these values, which can stabilize the search process.\n", "\n", "2. **`TreeNode`** \n", " - Represents a node in the MCTS search tree. \n", " - Stores statistics such as visit counts, the sum of value estimates, child edges, and prior probabilities. \n", " - The node can also store a hidden state representation and an estimated reward for transitioning from a parent node.\n", "\n", "3. **`MCTS` Class** \n", " - Controls the MCTS search process. \n", " - Contains key methods like `run`, `_expand_node`, `_backpropagate`, `_calc_ucb`, and `_compute_pi` for performing MCTS simulations, updating node statistics, and calculating action visit distributions. \n", " - Uses a dynamics model and prediction model to simulate the environment forward and estimate policies/values.\n", "\n", "4. **`naive_depth_search` Function** \n", " - Demonstrates a simpler (but inefficient) depth-based search. \n", " - Expands all actions up to a certain depth, computes rewards and leaf values, then picks the action sequence with the highest total return.\n", "\n", "## Form of the Desired Outputs\n", "\n", "- **`AdaptiveNormalizer`**: \n", " - Provides two primary methods: `update(val)` to update min/max stats and `normalize(val)` to scale values into a normalized range.\n", "\n", "- **`TreeNode`**: \n", " - Stores `edges` mapping each possible action to another `TreeNode`. \n", " - Maintains `avg_value` (the mean of all values backpropagated through it), `visit_count`, and `total_value_sum`. \n", " - It may store `state_rep` (the current hidden state representation) and `reward_est` (the immediate reward from the parent node to itself).\n", "\n", "- **`MCTS`**: \n", " 1. **`run(sims_count, root_state)`**: \n", " - Orchestrates the entire MCTS procedure for a given number of simulations (`sims_count`). \n", " - Returns the **visit counts** over actions (used to derive a policy) and the **average value** of the root node. \n", " 2. **`_expand_node(parent_node, new_node, chosen_action)`**: \n", " - Expands the leaf node by calling the environment (or model) with a chosen action, obtaining the next hidden state, policy, value, and reward. \n", " - Creates children edges for all possible actions from the new state. \n", " - Returns the new value estimate for further backpropagation. \n", " 3. **`_backpropagate(leaf_value)`**: \n", " - Starts at the leaf node and moves backward through the search path, updating each node’s visit count and total value sum with the discounted reward. \n", " 4. **`_calc_ucb(parent, child)`**: \n", " - Calculates the Upper Confidence Bound (UCB) score for a given child node. \n", " - Uses the node’s prior probability, visit counts, and normalized value estimates to balance exploration and exploitation. \n", " - Returns a scalar score that can be used to pick the best child action. \n", " 5. **`_compute_pi()`**: \n", " - Aggregates the visit counts for each action at the root node, which can be used as an approximate policy distribution.\n", "\n", "- **`naive_depth_search`**: \n", " - Explores every possible action combination up to a specified depth (`search_depth`). \n", " - Accumulates reward and discounted values, then selects the best action path found.\n", "\n", "Below, the bodies of certain functions have been removed and replaced with detailed docstrings explaining the required logic, along with a `TODO: Your code` placeholder.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iv8r4gPrGiBl" }, "outputs": [], "source": [ "##############################################\n", "# MCTS Classes\n", "##############################################\n", "class AdaptiveNormalizer:\n", " \"\"\"\n", " Simple min-max normalizer that tracks running min/max\n", " so we can scale Q-values in MCTS.\n", "\n", " Form of Operation:\n", " ------------------\n", " - Tracks the minimum and maximum values observed so far.\n", " - After each update, incoming values can be scaled to [0,1].\n", " \"\"\"\n", "\n", " def __init__(self):\n", " self.max_val = float(\"-inf\")\n", " self.min_val = float(\"inf\")\n", "\n", " def update(self, val):\n", " \"\"\"\n", " Updates the running minimum and maximum with the new value.\n", "\n", " Parameters:\n", " -----------\n", " val : torch.Tensor\n", " The new value for which we update min/max stats.\n", " \"\"\"\n", " val_cpu = val.cpu()\n", " self.max_val = max(self.max_val, val_cpu)\n", " self.min_val = min(self.min_val, val_cpu)\n", "\n", " def normalize(self, val):\n", " \"\"\"\n", " Scales the given value into the [0,1] range based on current min/max.\n", "\n", " Parameters:\n", " -----------\n", " val : torch.Tensor\n", " The value to be normalized.\n", "\n", " Returns:\n", " --------\n", " torch.Tensor\n", " The normalized value in [0,1] if max_val > min_val,\n", " otherwise returns the original value.\n", " \"\"\"\n", " val_cpu = val.cpu()\n", " if self.max_val > self.min_val:\n", " return ((val_cpu - self.min_val) / (self.max_val - self.min_val)).to(device)\n", " return val_cpu\n", "\n", "\n", "class TreeNode:\n", " \"\"\"\n", " Node for MCTS: stores children edges, prior,\n", " aggregated value stats, and so on.\n", "\n", " Attributes:\n", " -----------\n", " prior_prob : float\n", " The prior probability (from a policy network) of choosing\n", " the action that leads to this node from its parent.\n", " state_rep : torch.Tensor or None\n", " The hidden state representation if available.\n", " reward_est : torch.Tensor or float\n", " The immediate reward predicted for the transition from the parent node.\n", " edges : dict\n", " A dictionary mapping each action to a child TreeNode.\n", " total_value_sum : float\n", " Sum of all backpropagated values.\n", " visit_count : int\n", " Number of times this node has been visited.\n", " \"\"\"\n", "\n", " def __init__(self, prior_prob):\n", " self.prior_prob = prior_prob\n", " self.state_rep = None\n", " self.reward_est = None\n", " self.edges = {} # action -> TreeNode\n", "\n", " self.total_value_sum = 0.0\n", " self.visit_count = 0\n", "\n", " def is_expanded(self):\n", " \"\"\"\n", " Checks if the node has any child edges.\n", "\n", " Returns:\n", " --------\n", " bool\n", " True if there is at least one child edge, otherwise False.\n", " \"\"\"\n", " return len(self.edges) > 0\n", "\n", " def avg_value(self):\n", " \"\"\"\n", " Computes the average value estimate (total_value_sum / visit_count).\n", "\n", " Returns:\n", " --------\n", " float\n", " The mean value estimate. Returns 0.0 if visit_count is zero.\n", " \"\"\"\n", " if self.visit_count == 0:\n", " return 0.0\n", " return self.total_value_sum / self.visit_count\n", "\n", "\n", "##############################################\n", "# Some utility function\n", "##############################################\n", "\n", "\n", "def minmax_normalize_state(s):\n", " \"\"\"\n", " Simple bounding of the input tensor to [0,1] range per row.\n", " Not guaranteed to be safe for all tasks, but included for example.\n", "\n", " Parameters:\n", " -----------\n", " s : torch.Tensor\n", " The state tensor to normalize (shape: [batch_size, state_dim])\n", "\n", " Returns:\n", " --------\n", " torch.Tensor\n", " The normalized state tensor (element-wise scaled to [0,1]).\n", " \"\"\"\n", " b_size = s.shape[0]\n", " s_min = torch.min(s, dim=1)[0].reshape(b_size, 1)\n", " s_max = torch.max(s, dim=1)[0].reshape(b_size, 1)\n", " return (s - s_min) / (s_max - s_min)\n", "\n", "\n", "class MCTS:\n", " \"\"\"\n", " Runs MCTS simulations to select actions.\n", " Has:\n", " - root exploration noise\n", " - expansions\n", " - backup of value\n", " - UCB calculation\n", "\n", " Methods of Interest:\n", " --------------------\n", " - run(sims_count, root_state): Orchestrates the entire MCTS procedure.\n", " - _expand_node(parent_node, new_node, chosen_action): Creates children edges for a leaf node.\n", " - _backpropagate(leaf_value): Updates nodes' stats from leaf to root.\n", " - _calc_ucb(parent, child): Calculates UCB score for a child node.\n", " - _compute_pi(): Gathers visit counts at the root node.\n", " \"\"\"\n", "\n", " def __init__(\n", " self,\n", " num_actions,\n", " dynamics_model,\n", " predict_model,\n", " controlling_agent,\n", " gamma=0.99,\n", " ):\n", " self.num_actions = num_actions\n", " # TODO: Initialize the exploration constants\n", " self.c1 = ...\n", " self.c2 = ...\n", " self.gamma = gamma\n", "\n", " self.root_dirichlet_alpha = 0.25\n", " self.root_exploration_fraction = 0.25\n", "\n", " self.dynamics_model = dynamics_model\n", " self.prediction_model = predict_model\n", " self.agent = controlling_agent\n", " self.value_tracker = None # Assigned new for each search\n", "\n", " def run(self, sims_count, root_state):\n", " \"\"\"\n", " Orchestrates the entire MCTS process from a given root state.\n", " Steps:\n", " ------\n", " 1. Create a root node with an initial policy from `prediction_model`.\n", " 2. Perform `sims_count` simulations. In each simulation:\n", " - Traverse down the tree using a UCB-based action selection.\n", " - Expand a leaf node.\n", " - Backpropagate the value estimate up the search path.\n", " 3. Compute the policy distribution (visit counts) for the root node.\n", "\n", " Parameters:\n", " -----------\n", " sims_count : int\n", " Number of MCTS simulations to run.\n", " root_state : torch.Tensor\n", " The hidden state representation at the root.\n", "\n", " Returns:\n", " --------\n", " (np.array, float)\n", " - An array of visit counts for each action at the root.\n", " - The average value of the root node.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", "\n", " # Create the root\n", " init_policy, init_value = self.prediction_model(root_state)\n", " init_policy, init_val = init_policy.detach(), init_value.detach()\n", " self.root_node = self._initialize_root(root_state, init_policy)\n", "\n", " # track min/max for value normalization\n", " self.value_tracker = ...\n", "\n", " # Perform MCTS simulations\n", " for _ in range(sims_count):\n", " self.search_path = []\n", " self.search_path.append(self.root_node)\n", " self.action_path = []\n", " \n", " current_node = ...\n", " # Traverse down the tree\n", " # A loop that goes down the tree until a leaf is reached\n", " while (...):\n", " act_chosen, next_node = ...\n", "\n", " # Expand the newly reached leaf\n", " leaf_parent = self.search_path[-2]\n", " new_value = ...\n", "\n", " # Backup\n", " self._backpropagate(new_value)\n", "\n", " # Return (visit distribution, root value)\n", " # ...\n", "\n", " pass\n", "\n", " def _expand_node(self, parent_node, new_node, chosen_action):\n", " \"\"\"\n", " Expands a leaf node by querying the environment or model.\n", " Steps:\n", " ------\n", " 1. Performs a rollout step from `parent_node.state_rep` using `chosen_action`.\n", " 2. Receives next hidden state, policy, value, and reward estimates.\n", " 3. Updates `new_node` with the next hidden state and reward.\n", " 4. Creates child edges for every possible action, storing them in `new_node.edges`.\n", " 5. Returns the new value estimate to be used in backpropagation.\n", "\n", " Parameters:\n", " -----------\n", " parent_node : TreeNode\n", " The node from which we are expanding.\n", " new_node : TreeNode\n", " The newly reached node (initially empty).\n", " chosen_action : int\n", " The action index that led us to `new_node`.\n", "\n", " Returns:\n", " --------\n", " torch.Tensor\n", " The value estimate for the new node (shape: (1,) or appropriate).\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", "\n", " next_s, new_pi, new_v, new_reward = ...\n", "\n", " # detach the new values\n", " # ...\n", " \n", " # update the new node\n", " new_node.state_rep = ...\n", " new_node.reward_est = ...\n", "\n", " # create children edges\n", " # ...\n", "\n", " pass\n", "\n", " def _backpropagate(self, leaf_value):\n", " \"\"\"\n", " Moves up the search path, updating each node's total_value_sum and\n", " visit_count with the discounted sum of rewards + leaf_value.\n", "\n", " Steps:\n", " ------\n", " 1. Start from the leaf node, move to the parent node, etc., up to the root.\n", " 2. Each step, add `leaf_value` to the node's total_value_sum and increment visit_count.\n", " 3. Update the normalizer with the node's (reward_est + gamma * node_value).\n", " 4. Propagate leaf_value upward: leaf_value = reward_est + gamma * leaf_value.\n", "\n", " Parameters:\n", " -----------\n", " leaf_value : torch.Tensor or float\n", " The value obtained at the leaf, which is iteratively updated as we move up.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " for node in (...):\n", " node.total_value_sum = ...\n", " node.visit_count = ...\n", " \n", " \n", " self.value_tracker.update(...)\n", " \n", " leaf_value = ...\n", " \n", " def _select_ucb_action(self, node):\n", " \"\"\"\n", " Selects the action with the highest UCB score among the children of `node`.\n", "\n", " Parameters:\n", " -----------\n", " node : TreeNode\n", " The node for which we want to choose an action.\n", "\n", " Returns:\n", " --------\n", " (int, TreeNode)\n", " - The action index.\n", " - The corresponding child node.\n", " \"\"\"\n", " # Evaluate UCB for each child\n", " ucb_scores = [\n", " self._calc_ucb(node, node.edges[a]) for a in range(self.num_actions)\n", " ]\n", " best_act = np.argmax(ucb_scores)\n", " return best_act, node.edges[best_act]\n", "\n", " def _calc_ucb(self, parent, child):\n", " \"\"\"\n", " Calculates the PUCT (Upper Confidence Bound) score for a child node.\n", "\n", " Detailed Implementation Steps:\n", " ------------------------------\n", " 1. **Exploration Term**:\n", " - Compute a factor (often denoted `pb_c`) that depends on the parent's total visits (`parent.visit_count`),\n", " constants `self.c1`, `self.c2`, and the child's number of visits (`child.visit_count`).\n", " - Typically, this involves a term like:\n", " `pb_c = log((parent.visit_count + c2 + 1) / c2) + c1`\n", " and a multiplier of `sqrt(parent.visit_count) / (child.visit_count + 1)`.\n", " 2. **Prior Probability**:\n", " - Multiply `pb_c` by the child's prior probability (`child.prior_prob`).\n", " - This encourages exploring children with higher prior probabilities.\n", " 3. **Value Term**:\n", " - Combine the child's estimated reward (`child.reward_est`) plus `self.gamma * child.avg_value()`.\n", " - Normalize this value using the parent's `value_tracker` (if available) to keep scores in a comparable range.\n", " 4. **Combine Exploration and Value**:\n", " - Sum the exploration term (including prior) and the normalized value term to get the final UCB score.\n", " 5. **Return UCB Score**:\n", " - Convert the result to a float and return it as the child's PUCT or UCB score.\n", "\n", " Parameters:\n", " -----------\n", " parent : TreeNode\n", " The node whose children we are evaluating.\n", " child : TreeNode\n", " The child node for which we calculate the UCB score.\n", "\n", " Returns:\n", " --------\n", " float\n", " The computed UCB score of the form:\n", " exploration_term (depends on prior) + normalized_value_term.\n", "\n", " TODO:\n", " -----\n", " Your code\n", "\n", " \"\"\"\n", "\n", " # Compute the exploration factor (pb_c) using c1, c2, parent.visit_count, etc.\n", " pb_c = ...\n", "\n", " # Multiply pb_c by child.prior_prob\n", " prior_val = ...\n", "\n", " # Calculate the value term = reward + gamma * avg_value\n", " if child.visit_count > 0:\n", " val_score = ...\n", " else:\n", " val_score = ...\n", " \n", "\n", " # Normalize the value term if needed\n", " # Add exploration and value to get final score\n", "\n", " pass\n", "\n", " def _compute_pi(self):\n", " \"\"\"\n", " Collects the visit counts of each child of the root node to form a policy distribution.\n", "\n", " Steps:\n", " ------\n", " 1. For each action in range(num_actions), retrieve the visit_count from the root's children.\n", " 2. Return these counts as a NumPy array.\n", "\n", " Returns:\n", " --------\n", " np.array\n", " An array of size (num_actions,) containing the visit counts at the root.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " visits = []\n", " # For each possible action from root, gather visits\n", "\n", " pass\n", "\n", " def _add_root_noise(self, root):\n", " \"\"\"\n", " Adds Dirichlet noise to the root node's child prior probabilities\n", " to encourage exploration.\n", " \"\"\"\n", " noise = np.random.dirichlet([self.root_dirichlet_alpha] * self.num_actions)\n", " frac = self.root_exploration_fraction\n", " for act_id, n_val in zip(range(self.num_actions), noise):\n", " root.edges[act_id].prior_prob = (\n", " root.edges[act_id].prior_prob * (1 - frac) + n_val * frac\n", " )\n", " return root\n", "\n", " def _initialize_root(self, root_tensor, p_init):\n", " \"\"\"\n", " Initializes the root node with policy priors for each action and adds root noise.\n", " \"\"\"\n", " p_init = p_init.detach().cpu().numpy()\n", " node = TreeNode(0)\n", " node.state_rep = root_tensor\n", " node.reward_est = 0\n", "\n", " for i in range(self.num_actions):\n", " node.edges[i] = TreeNode(p_init[0, i])\n", "\n", " # Add exploration noise\n", " node = self._add_root_noise(node)\n", " return node\n", "\n", "\n", "def naive_depth_search(agent, hidden_s, act_count, gamma_val, search_depth=3):\n", " \"\"\"\n", " Very naive search that fully expands all actions up to a given depth,\n", " tracks predicted reward + discounted value at the leaf, and picks the best\n", " root action based on these expansions.\n", "\n", " Detailed Implementation Steps:\n", " ------------------------------\n", " 1. **Enumerate Actions**:\n", " - Generate every possible sequence of actions (branches) up to `search_depth`.\n", " This involves creating repeated states/actions for each level.\n", " 2. **Rollout Step**:\n", " - For each action sequence, call `agent.rollout_step` to predict:\n", " (a) The next hidden state (`next_s`)\n", " (b) The predicted value at the next state (`leaf_val`)\n", " (c) The immediate reward (`leaf_r`)\n", " - This is done repeatedly for each depth layer, accumulating rewards.\n", " 3. **Discounting**:\n", " - Multiply the reward at each depth by `gamma_val^depth`.\n", " - Similarly, the final leaf value is discounted by `gamma_val^search_depth`.\n", " 4. **Summation**:\n", " - Compute the total return by adding all discounted rewards to the final discounted value.\n", " 5. **Select Best Sequence**:\n", " - Determine which action sequence yields the highest total return.\n", " - Return the *first action* of that best sequence and the root value estimate.\n", "\n", " Parameters:\n", " -----------\n", " agent : object\n", " The agent that provides a `prediction_model` and `rollout_step` method.\n", " hidden_s : torch.Tensor\n", " The current hidden state representation (shape: (1, hidden_dim)).\n", " act_count : int\n", " The number of possible actions in the environment.\n", " gamma_val : float\n", " The discount factor applied to future rewards and values.\n", " search_depth : int\n", " The maximum depth up to which actions are enumerated.\n", "\n", " Returns:\n", " --------\n", " (int, torch.Tensor)\n", " - The best action to take at the root (index into the action space).\n", " - The estimated root value from `agent.prediction_model`.\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", "\n", " # Initialize any data structures for storing accumulated rewards/values\n", " possible_acts = np.arange(act_count)\n", "\n", " # Just get the root value\n", " _, root_v = agent.prediction_model(hidden_s)\n", " root_value = None\n", "\n", " combined_rewards = torch.tensor([0.0], device=device)\n", "\n", " # For depth in range(search_depth), enumerate actions and states\n", " for depth in range(search_depth):\n", " state = ... # repeat the states for each action\n", " repeated_acts = ...\n", "\n", " # Use agent.rollout_step() to get next_s, leaf_val, leaf_r for each branch\n", "\n", " # keep im mind to detach the tensors. you don't want the gradients to flow back to the model\n", "\n", " # Expand reward sum\n", " combined_rewards = ... # repeat the rewards for each action\n", " adjusted_r = ...\n", " combined_rewards += adjusted_r\n", "\n", " # Accumulate discounted rewards and final value\n", " final_vals = ...\n", " # total\n", " final_vals = ...\n", "\n", " # Determine which sequence has the maximum total return\n", "\n", " # Pick the first action from that sequence and return it alongside root value\n", "\n", " pass\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# `Agent` Class\n", "\n", "The `Agent` class is a neural network module that coordinates between:\n", "- A **representation network** (`rep_net`) to encode raw observations into a hidden state.\n", "- A **dynamics network** (`dyn_net`) to predict the next hidden state and reward given a current hidden state and action.\n", "- A **prediction network** (`pred_net`) to output a policy distribution and value estimate from a hidden state.\n", "\n", "The agent can use different search strategies to pick actions:\n", "1. **MCTS (Monte Carlo Tree Search)** \n", " - When `search_type=\"mcts\"`, the agent constructs an MCTS object to run simulations and pick an action from the resulting search tree.\n", "2. **Naive Depth Search** \n", " - When `search_type=\"naive\"`, the agent expands possible actions up to a specified `search_depth`, and chooses the best outcome.\n", "3. **Direct Prediction** \n", " - Otherwise, the agent relies purely on the `pred_net`’s policy and value to pick actions without search.\n", "\n", "Key Methods:\n", "- **`forward(obs)`**: Overrides the PyTorch module’s forward method (not used directly in the search logic here).\n", "- **`inference(obs_tensor)`**: Determines the action to take given an input observation. This may involve MCTS or naive search, or direct prediction, depending on the agent’s settings.\n", "- **`initial_step(obs)`**: Produces the hidden state representation and an initial policy/value from a raw observation. Useful for MCTS when creating the root node.\n", "- **`rollout_step(hidden_s, chosen_actions)`**: Given hidden states and chosen actions, predicts the next hidden states, immediate rewards, and the policy/value at those next states.\n", "\n", "## Desired Outputs\n", "- **`inference(obs_tensor)`**:\n", " - **action_int**: An integer action index chosen by the search or direct policy sampling.\n", " - **policy_distribution**: A NumPy array (or torch tensor) of shape `(num_actions,)`, representing the probability distribution over actions.\n", " - **estimated_value**: A scalar estimate (float or tensor) of the state’s value.\n", "\n", "- **`initial_step(obs)`**:\n", " - **s**: The hidden state tensor generated by `rep_net(obs)`.\n", " - **pol**: The policy distribution (shape: `(1, num_actions)`) predicted by the `pred_net`.\n", " - **v**: The value estimate (shape: `(1,)` or `(1,1)`) from the `pred_net`.\n", "\n", "- **`rollout_step(hidden_s, chosen_actions)`**:\n", " - **next_hidden**: The predicted next hidden state(s).\n", " - **p**: The predicted policy distribution(s).\n", " - **v**: The predicted value(s).\n", " - **predicted_reward**: The immediate reward estimate(s) from the dynamics network.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "##############################################\n", "# Agent\n", "##############################################\n", "class Agent(nn.Module):\n", " \"\"\"\n", " Agent with optional MCTS or naive search to pick actions.\n", "\n", " Attributes:\n", " -----------\n", " rep_net : nn.Module\n", " Network to encode observations into hidden states.\n", " dyn_net : nn.Module\n", " Network to predict next hidden states and rewards.\n", " pred_net : nn.Module\n", " Network to output a policy distribution and value estimate from a hidden state.\n", " num_actions : int\n", " Number of possible actions.\n", " gamma : float\n", " Discount factor for future rewards.\n", " search_type : str\n", " Indicates which search strategy to use (\"mcts\", \"naive\", or direct).\n", " simulations : int\n", " Number of MCTS simulations to run if MCTS is used.\n", " naive_search_depth : int\n", " Depth to explore for naive search.\n", " temperature : float\n", " Controls the softmax temperature when sampling actions from a policy.\n", " mcts : MCTS or None\n", " The MCTS object if search_type is \"mcts\"; otherwise None.\n", " \"\"\"\n", "\n", " def __init__(\n", " self,\n", " sim_count,\n", " act_count,\n", " rep_net,\n", " dyn_net,\n", " pred_net,\n", " search_type=\"mcts\",\n", " disc_factor=0.99,\n", " naive_len=3,\n", " ):\n", " super().__init__()\n", " self.rep_net = rep_net\n", " self.dyn_net = dyn_net\n", " self.pred_net = pred_net\n", "\n", " self.num_actions = act_count\n", " self.gamma = disc_factor\n", " self.search_type = search_type\n", " self.simulations = sim_count\n", " self.naive_search_depth = naive_len\n", " self.temperature = 1.0\n", "\n", " if self.search_type == \"mcts\":\n", " self.mcts = MCTS(\n", " act_count, dyn_net, pred_net, self, gamma=disc_factor\n", " )\n", " else:\n", " self.mcts = None\n", "\n", " def forward(self, obs):\n", " \"\"\"\n", " Placeholder forward pass to conform to PyTorch's nn.Module interface.\n", "\n", " Parameters:\n", " -----------\n", " obs : torch.Tensor\n", " The input observation.\n", " \"\"\"\n", " pass\n", "\n", " def inference(self, obs_tensor):\n", " \"\"\"\n", " Determines the action to take given an input observation, possibly using MCTS or naive depth search.\n", "\n", " Detailed Implementation Steps:\n", " ------------------------------\n", " 1. Convert the raw observation `obs_tensor` into a hidden state via `self.rep_net`.\n", " 2. If `mcts` is available, run MCTS simulations and derive action probabilities from visit counts.\n", " 3. If `search_type` is \"naive\", run naive_depth_search to pick the best action.\n", " 4. Otherwise, directly use `pred_net` to get a policy distribution and sample an action.\n", " 5. Return (picked_action, policy_distribution, estimated_value).\n", "\n", " Returns:\n", " --------\n", " (int, np.array, float or torch.Tensor)\n", " - Chosen action index (int).\n", " - Policy distribution over actions (np.array with shape (num_actions,)).\n", " - The estimated value of the state (float or tensor).\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " # convert observation to hidden\n", " hidden = ...\n", "\n", " if self.mcts:\n", " # MCTS-based\n", " child_visits, root_val = ...\n", " action_probs = ...\n", "\n", " # Apply temperature\n", " adjusted_pi = ...\n", " picked_action = ...\n", " return picked_action, action_probs, root_val\n", " elif self.search_type == \"naive\":\n", " # naive search\n", " best_a, r_val = ...\n", " # either uniform or one-hot for distribution\n", " result_pi = np.zeros(self.num_actions, dtype=np.float32)\n", " result_pi[best_a] = 1.0\n", " return best_a, result_pi, r_val\n", " else:\n", " # direct prediction\n", " with torch.no_grad():\n", " pol, val = ...\n", " # sample from pol^1/T\n", " pol_np = ...\n", " pol_np = ...\n", " pol_np = ...\n", " chosen_act = ...\n", " return chosen_act, pol_np, val\n", "\n", " def initial_step(self, obs):\n", " \"\"\"\n", " Produces the initial hidden state, policy distribution, and value estimate for a root observation.\n", "\n", " Detailed Implementation Steps:\n", " ------------------------------\n", " 1. Encode the observation into a hidden state using `rep_net`.\n", " 2. Feed that hidden state into `pred_net` to obtain:\n", " - A policy distribution (shape: [1, num_actions])\n", " - A value estimate (shape: [1] or similar)\n", " 3. Return these three items.\n", "\n", " Returns:\n", " --------\n", " (torch.Tensor, torch.Tensor, torch.Tensor)\n", " - Hidden state (shape: [1, hidden_dim]).\n", " - Policy distribution (shape: [1, num_actions]).\n", " - Value estimate (shape: [1]).\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " s = ...\n", " pol, v = ...\n", " return s, pol, v\n", "\n", " def rollout_step(self, hidden_s, chosen_actions):\n", " \"\"\"\n", " Given a batch of hidden states and chosen actions, predict the next hidden states, immediate rewards,\n", " and the subsequent policy/value.\n", "\n", " Detailed Implementation Steps:\n", " ------------------------------\n", " 1. Encode actions numerically (e.g., one-hot or scaled) and concatenate them with `hidden_s`.\n", " 2. Pass this concatenated input to `dyn_net` to get:\n", " - Next hidden states\n", " - Predicted reward\n", " 3. Use `pred_net` on the next hidden states to get:\n", " - Next policy distributions\n", " - Next value estimates\n", " 4. Return all four: next_hidden, policy, value, predicted_reward.\n", "\n", " Returns:\n", " --------\n", " (torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor)\n", " - next_hidden: shape (batch_size, hidden_dim)\n", " - policy: shape (batch_size, num_actions)\n", " - value: shape (batch_size,)\n", " - predicted_reward: shape (batch_size,)\n", "\n", " TODO:\n", " -----\n", " Your code\n", " \"\"\"\n", " batch_sz = ...\n", " # Normalize action to [0,1]\n", " act_enc = ...\n", " act_enc /= self.num_actions\n", "\n", " # feed dynamics\n", " dyn_input = torch.cat([hidden_s, act_enc], dim=1)\n", " next_hidden, predicted_reward = ...\n", "\n", " # get next policy + value\n", " p, v = ...\n", "\n", " return next_hidden, p, v, predicted_reward\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 576 }, "id": "j4um4-S5r17Z", "outputId": "17e692a3-7ce6-4cb3-8195-8eb50ae823a6" }, "outputs": [], "source": [ "def train_with_search_policy(search_type='mcts'):\n", " \"\"\"\n", " Trains a MuZero_Agent with the given `search_type` using\n", " your exact train-loop code. We only add reward logging/plotting.\n", " \"\"\"\n", " # Your same hyper-params:\n", " history_length = 3\n", " num_hidden = 50\n", " num_simulations = 20\n", " replay_capacity = 200\n", " batch_size = 32\n", " k = 5\n", " n = 10\n", " lr = 1e-3\n", " value_coef = 1\n", " reward_coef = 1\n", "\n", " # Environment\n", " raw_env = gym.make('CartPole-v0')\n", " num_obs_space = raw_env.observation_space.shape[0]\n", " num_actions = raw_env.action_space.n\n", " num_in = history_length * num_obs_space\n", " env = GymWrapper(raw_env, history_length)\n", "\n", " # Models\n", " # TODO: Create the models\n", " representation_model = ...\n", " dynamics_model = ...\n", " prediction_model = ...\n", "\n", " # --- Attach the chosen search mode to the MuZero agent. ---\n", " # For example, inside MuZero_Agent you might have something like:\n", " # self.search_type = search_type\n", " # and you check it to decide how to plan (MCTS vs naive vs none).\n", "\n", " # TODO: Create the agent with the chosen search policy\n", " agent = ...\n", "\n", " # Our \"runner\" is replaced with a LoggingEnvRunner:\n", " runner = LoggingEnvRunner(env)\n", " replay_buffer = BufferReplay(replay_capacity, num_actions)\n", "\n", " mse_loss = ...\n", " optimizer = ...\n", "\n", " # -----------------------------------------------------------------\n", " # The training loop. We do NOT modify its logic.\n", " # We only rely on the fact that runner.run(agent) logs returns.\n", " # -----------------------------------------------------------------\n", " for episode in range(2000):\n", " trajectory = runner.run(agent)\n", " replay_buffer.add_trajectories([trajectory])\n", "\n", " if len(replay_buffer) < 15:\n", " continue\n", "\n", " # Some temperature scheduling (unchanged)\n", " if episode < 250:\n", " agent.temperature = 1\n", " elif episode < 300:\n", " agent.temperature = 0.75\n", " elif episode < 400:\n", " agent.temperature = 0.65\n", " elif episode < 500:\n", " agent.temperature = 0.55\n", " elif episode < 600:\n", " agent.temperature = 0.3\n", " else:\n", " agent.temperature = 0.25\n", "\n", "\n", " # TODO: Compute the loss and update the model. Keep in mind to detach the target values when computing the loss. \n", " # We do 16 mini-batch updates each episode:\n", " for i in range(16):\n", " optimizer.zero_grad()\n", "\n", " data = ...\n", "\n", " representation_in = torch.stack(\n", " [torch.flatten(data[i][\"obs\"]) for i in range(batch_size)]\n", " ).to(device).to(dtype)\n", "\n", " actions = np.stack([np.array(data[i][\"actions\"], dtype=np.int64)\n", " for i in range(batch_size)])\n", " rewards_target = torch.stack([torch.tensor(data[i][\"rewards\"])\n", " for i in range(batch_size)]).to(device).to(dtype)\n", " policy_target = torch.stack([torch.stack(data[i][\"pi\"])\n", " for i in range(batch_size)]).to(device).to(dtype)\n", " value_target = torch.stack([torch.tensor(data[i][\"return\"])\n", " for i in range(batch_size)]).to(device).to(dtype)\n", "\n", " loss = torch.tensor(0).to(device).to(dtype)\n", "\n", " # Initial step\n", " state, p, v = agent.initial_step(representation_in)\n", " policy_loss = ...\n", " \n", " value_loss = ...\n", " loss += (policy_loss + value_coef * value_loss) / 2\n", "\n", " # k unroll steps\n", " for step in range(1, k+1):\n", " step_action = actions[:, step - 1]\n", " state, p, v, rewards = ...\n", " \n", "\n", " pol_loss = ...\n", " \n", " val_loss = ...\n", " rew_loss = ...\n", "\n", " loss += (pol_loss + value_coef * val_loss + reward_coef * rew_loss) / k\n", "\n", " loss.backward()\n", " optimizer.step()\n", "\n", " # -----------------------------------------------------------------\n", " # Live plotting every so often (e.g. every 50 episodes).\n", " # This does NOT change the update logic; we’re just visualizing.\n", " # -----------------------------------------------------------------\n", " if (episode + 1) % 5 == 0:\n", " clear_output(True)\n", " plt.figure(figsize=(7,5))\n", " plt.plot(runner.episode_returns, label=f'{search_type} returns')\n", " plt.title(f'{search_type} - Episode {episode+1}')\n", " plt.xlabel('Episode')\n", " plt.ylabel('Return')\n", " plt.legend()\n", " plt.show()\n", "\n", " # Finally, return the entire list of returns so we can compare across runs\n", " return runner.episode_returns\n", "\n", "\n", "# ---------------------------------------------------------------------\n", "# 4) Run training for each search policy and then compare final results.\n", "# Each call uses the same code above but different `search_type`.\n", "# ---------------------------------------------------------------------\n", "all_results = {}\n", "search_types = ['mcts']\n", "\n", "for s_type in search_types:\n", " returns = train_with_search_policy(s_type)\n", " all_results[s_type] = returns\n", "\n", "# ---------------------------------------------------------------------\n", "# 5) Plot them all on one final chart (optional).\n", "# ---------------------------------------------------------------------\n", "plt.figure(figsize=(9,6))\n", "for s_type in search_types:\n", " plt.plot(all_results[s_type], label=s_type)\n", "plt.title(\"Comparison of MuZero Agents with Different Search Policies\")\n", "plt.xlabel(\"Episode\")\n", "plt.ylabel(\"Return\")\n", "plt.legend()\n", "plt.show()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Comparison of the Search Policies and results\n", "\n", "Run training for each search policy and then compare final results. Each call uses the same code above but different `search_type`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO\n", "# Your code here" ] } ], "metadata": { "accelerator": "GPU", "colab": { "gpuType": "T4", "provenance": [] }, "kaggle": { "accelerator": "none", "dataSources": [ { "datasetId": 6629936, "sourceId": 10698698, "sourceType": "datasetVersion" } ], "dockerImageVersionId": 30886, "isGpuEnabled": false, "isInternetEnabled": true, "language": "python", "sourceType": "notebook" }, "kernelspec": { "display_name": "ssw", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.11" } }, "nbformat": 4, "nbformat_minor": 0 }