{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Configurations for Colab" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import sys\n", "IN_COLAB = \"google.colab\" in sys.modules\n", "\n", "if IN_COLAB:\n", " !apt install python-opengl\n", " !apt install ffmpeg\n", " !apt install xvfb\n", " !pip install PyVirtualDisplay==3.0\n", " !pip install gymnasium==0.28.1\n", " from pyvirtualdisplay import Display\n", " \n", " # Start virtual display\n", " dis = Display(visible=0, size=(400, 400))\n", " dis.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 04. Dueling Network\n", "\n", "[Z. Wang et al., \"Dueling Network Architectures for Deep Reinforcement Learning.\" arXiv preprint arXiv:1511.06581, 2015.](https://arxiv.org/pdf/1511.06581.pdf)\n", "\n", "The proposed network architecture, which is named *dueling architecture*, explicitly separates the representation of state values and (state-dependent) action advantages. \n", "\n", "![fig1](https://user-images.githubusercontent.com/14961526/60322956-c2f0b600-99bb-11e9-9ed4-443bd14bc3b0.png)\n", "\n", "The dueling network automatically produces separate estimates of the state value function and advantage function, without any extra supervision. Intuitively, the dueling architecture can learn which states are (or are not) valuable, without having to learn the effect of each action for each state. This is particularly useful in states where its actions do not affect the environment in any relevant way. \n", "\n", "The dueling architecture represents both the value $V(s)$ and advantage $A(s, a)$ functions with a single deep model whose output combines the two to produce a state-action value $Q(s, a)$. Unlike in advantage updating, the representation and algorithm are decoupled by construction.\n", "\n", "$$A^\\pi (s, a) = Q^\\pi (s, a) - V^\\pi (s).$$\n", "\n", "The value function $V$ measures the how good it is to be in a particular state $s$. The $Q$ function, however, measures the the value of choosing a particular action when in this state. Now, using the definition of advantage, we might be tempted to construct the aggregating module as follows:\n", "\n", "$$Q(s, a; \\theta, \\alpha, \\beta) = V (s; \\theta, \\beta) + A(s, a; \\theta, \\alpha),$$\n", "\n", "where $\\theta$ denotes the parameters of the convolutional layers, while $\\alpha$ and $\\beta$ are the parameters of the two streams of fully-connected layers.\n", "\n", "Unfortunately, the above equation is unidentifiable in the sense that given $Q$ we cannot recover $V$ and $A$ uniquely; for example, there are uncountable pairs of $V$ and $A$ that make $Q$ values to zero. To address this issue of identifiability, we can force the advantage function estimator to have zero advantage at the chosen action. That is, we let the last module of the network implement the forward mapping.\n", "\n", "$$\n", "Q(s, a; \\theta, \\alpha, \\beta) = V (s; \\theta, \\beta) + \\big( A(s, a; \\theta, \\alpha) - \\max_{a' \\in |\\mathcal{A}|} A(s, a'; \\theta, \\alpha) \\big).\n", "$$\n", "\n", "This formula guarantees that we can recover the unique $V$ and $A$, but the optimization is not so stable because the advantages have to compensate any change to the optimal action’s advantage. Due to the reason, an alternative module that replaces the max operator with an average is proposed:\n", "\n", "$$\n", "Q(s, a; \\theta, \\alpha, \\beta) = V (s; \\theta, \\beta) + \\big( A(s, a; \\theta, \\alpha) - \\frac{1}{|\\mathcal{A}|} \\sum_{a'} A(s, a'; \\theta, \\alpha) \\big).\n", "$$\n", "\n", "Unlike the max advantage form, in this formula, the advantages only need to change as fast as the mean, so it increases the stability of optimization." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/jinwoo.park/miniforge3/envs/rainbow-is-all-you-need/lib/python3.8/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", " from .autonotebook import tqdm as notebook_tqdm\n" ] } ], "source": [ "import os\n", "from typing import Dict, List, Tuple\n", "\n", "import gymnasium as gym\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "from IPython.display import clear_output\n", "from torch.nn.utils import clip_grad_norm_" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Replay buffer\n", "\n", "Please see *01.dqn.ipynb* for detailed description." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "class ReplayBuffer:\n", " \"\"\"A simple numpy replay buffer.\"\"\"\n", "\n", " def __init__(self, obs_dim: int, size: int, batch_size: int = 32):\n", " self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.acts_buf = np.zeros([size], dtype=np.float32)\n", " self.rews_buf = np.zeros([size], dtype=np.float32)\n", " self.done_buf = np.zeros(size, dtype=np.float32)\n", " self.max_size, self.batch_size = size, batch_size\n", " self.ptr, self.size, = 0, 0\n", "\n", " def store(\n", " self,\n", " obs: np.ndarray,\n", " act: np.ndarray, \n", " rew: float, \n", " next_obs: np.ndarray, \n", " done: bool,\n", " ):\n", " self.obs_buf[self.ptr] = obs\n", " self.next_obs_buf[self.ptr] = next_obs\n", " self.acts_buf[self.ptr] = act\n", " self.rews_buf[self.ptr] = rew\n", " self.done_buf[self.ptr] = done\n", " self.ptr = (self.ptr + 1) % self.max_size\n", " self.size = min(self.size + 1, self.max_size)\n", "\n", " def sample_batch(self) -> Dict[str, np.ndarray]:\n", " idxs = np.random.choice(self.size, size=self.batch_size, replace=False)\n", " return dict(obs=self.obs_buf[idxs],\n", " next_obs=self.next_obs_buf[idxs],\n", " acts=self.acts_buf[idxs],\n", " rews=self.rews_buf[idxs],\n", " done=self.done_buf[idxs])\n", "\n", " def __len__(self) -> int:\n", " return self.size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dueling Network\n", "\n", "Carefully take a look at advantage and value layers separated from feature layer." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "class Network(nn.Module):\n", " def __init__(self, in_dim: int, out_dim: int):\n", " \"\"\"Initialization.\"\"\"\n", " super(Network, self).__init__()\n", "\n", " # set common feature layer\n", " self.feature_layer = nn.Sequential(\n", " nn.Linear(in_dim, 128), \n", " nn.ReLU(),\n", " )\n", " \n", " # set advantage layer\n", " self.advantage_layer = nn.Sequential(\n", " nn.Linear(128, 128),\n", " nn.ReLU(),\n", " nn.Linear(128, out_dim),\n", " )\n", "\n", " # set value layer\n", " self.value_layer = nn.Sequential(\n", " nn.Linear(128, 128),\n", " nn.ReLU(),\n", " nn.Linear(128, 1),\n", " )\n", "\n", " def forward(self, x: torch.Tensor) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " feature = self.feature_layer(x)\n", " \n", " value = self.value_layer(feature)\n", " advantage = self.advantage_layer(feature)\n", "\n", " q = value + advantage - advantage.mean(dim=-1, keepdim=True)\n", " \n", " return q" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DQN + DuelingNet Agent (w/o Double-DQN & PER)\n", "\n", "Here is a summary of DQNAgent class.\n", "\n", "| Method | Note |\n", "| --- | --- |\n", "|select_action | select an action from the input state. |\n", "|step | take an action and return the response of the env. |\n", "|compute_dqn_loss | return dqn loss. |\n", "|update_model | update the model by gradient descent. |\n", "|target_hard_update| hard update from the local model to the target model.|\n", "|train | train the agent during num_frames. |\n", "|test | test the agent (1 episode). |\n", "|plot | plot the training progresses. |\n", "\n", "\n", "Aside from the dueling network architecture, the authors suggest to use Double-DQN and Prioritized Experience Replay as extra components for better performance. However, we don't implement them to simplify the tutorial. There is only one diffrence between DQNAgent here and the one from *01.dqn.ipynb* and that is the usage of clip_grad_norm_ to prevent gradient exploding." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "class DQNAgent:\n", " \"\"\"DQN Agent interacting with environment.\n", " \n", " Attribute:\n", " env (gym.Env): openAI Gym environment\n", " memory (ReplayBuffer): replay memory to store transitions\n", " batch_size (int): batch size for sampling\n", " epsilon (float): parameter for epsilon greedy policy\n", " epsilon_decay (float): step size to decrease epsilon\n", " max_epsilon (float): max value of epsilon\n", " min_epsilon (float): min value of epsilon\n", " target_update (int): period for target model's hard update\n", " gamma (float): discount factor\n", " dqn (Network): model to train and select actions\n", " dqn_target (Network): target model to update\n", " optimizer (torch.optim): optimizer for training dqn\n", " transition (list): transition information including\n", " state, action, reward, next_state, done\n", " \"\"\"\n", "\n", " def __init__(\n", " self, \n", " env: gym.Env,\n", " memory_size: int,\n", " batch_size: int,\n", " target_update: int,\n", " epsilon_decay: float,\n", " seed: int,\n", " max_epsilon: float = 1.0,\n", " min_epsilon: float = 0.1,\n", " gamma: float = 0.99,\n", " ):\n", " \"\"\"Initialization.\n", " \n", " Args:\n", " env (gym.Env): openAI Gym environment\n", " memory_size (int): length of memory\n", " batch_size (int): batch size for sampling\n", " target_update (int): period for target model's hard update\n", " epsilon_decay (float): step size to decrease epsilon\n", " lr (float): learning rate\n", " max_epsilon (float): max value of epsilon\n", " min_epsilon (float): min value of epsilon\n", " gamma (float): discount factor\n", " \"\"\"\n", " obs_dim = env.observation_space.shape[0]\n", " action_dim = env.action_space.n\n", " \n", " self.env = env\n", " self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)\n", " self.batch_size = batch_size\n", " self.epsilon = max_epsilon\n", " self.epsilon_decay = epsilon_decay\n", " self.seed = seed\n", " self.max_epsilon = max_epsilon\n", " self.min_epsilon = min_epsilon\n", " self.target_update = target_update\n", " self.gamma = gamma\n", " \n", " # device: cpu / gpu\n", " self.device = torch.device(\n", " \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", " )\n", " print(self.device)\n", "\n", " # networks: dqn, dqn_target\n", " self.dqn = Network(obs_dim, action_dim).to(self.device)\n", " self.dqn_target = Network(obs_dim, action_dim).to(self.device)\n", " self.dqn_target.load_state_dict(self.dqn.state_dict())\n", " self.dqn_target.eval()\n", " \n", " # optimizer\n", " self.optimizer = optim.Adam(self.dqn.parameters())\n", "\n", " # transition to store in memory\n", " self.transition = list()\n", " \n", " # mode: train / test\n", " self.is_test = False\n", "\n", " def select_action(self, state: np.ndarray) -> np.ndarray:\n", " \"\"\"Select an action from the input state.\"\"\"\n", " # epsilon greedy policy\n", " if self.epsilon > np.random.random():\n", " selected_action = self.env.action_space.sample()\n", " else:\n", " selected_action = self.dqn(\n", " torch.FloatTensor(state).to(self.device)\n", " ).argmax()\n", " selected_action = selected_action.detach().cpu().numpy()\n", " \n", " if not self.is_test:\n", " self.transition = [state, selected_action]\n", " \n", " return selected_action\n", "\n", " def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:\n", " \"\"\"Take an action and return the response of the env.\"\"\"\n", " next_state, reward, terminated, truncated, _ = self.env.step(action)\n", " done = terminated or truncated\n", " \n", " if not self.is_test:\n", " self.transition += [reward, next_state, done]\n", " self.memory.store(*self.transition)\n", " \n", " return next_state, reward, done\n", "\n", " def update_model(self) -> torch.Tensor:\n", " \"\"\"Update the model by gradient descent.\"\"\"\n", " samples = self.memory.sample_batch()\n", "\n", " loss = self._compute_dqn_loss(samples)\n", "\n", " self.optimizer.zero_grad()\n", " loss.backward()\n", " # DuelingNet: we clip the gradients to have their norm less than or equal to 10.\n", " clip_grad_norm_(self.dqn.parameters(), 10.0)\n", " self.optimizer.step()\n", "\n", " return loss.item()\n", " \n", " def train(self, num_frames: int, plotting_interval: int = 200):\n", " \"\"\"Train the agent.\"\"\"\n", " self.is_test = False\n", " \n", " state, _ = self.env.reset(seed=self.seed)\n", " update_cnt = 0\n", " epsilons = []\n", " losses = []\n", " scores = []\n", " score = 0\n", "\n", " for frame_idx in range(1, num_frames + 1):\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", "\n", " # if episode ends\n", " if done:\n", " state, _ = self.env.reset(seed=self.seed)\n", " scores.append(score)\n", " score = 0\n", "\n", " # if training is ready\n", " if len(self.memory) >= self.batch_size:\n", " loss = self.update_model()\n", " losses.append(loss)\n", " update_cnt += 1\n", " \n", " # linearly decrease epsilon\n", " self.epsilon = max(\n", " self.min_epsilon, self.epsilon - (\n", " self.max_epsilon - self.min_epsilon\n", " ) * self.epsilon_decay\n", " )\n", " epsilons.append(self.epsilon)\n", " \n", " # if hard update is needed\n", " if update_cnt % self.target_update == 0:\n", " self._target_hard_update()\n", "\n", " # plotting\n", " if frame_idx % plotting_interval == 0:\n", " self._plot(frame_idx, scores, losses, epsilons)\n", " \n", " self.env.close()\n", " \n", " def test(self, video_folder: str) -> None:\n", " \"\"\"Test the agent.\"\"\"\n", " self.is_test = True\n", " \n", " # for recording a video\n", " naive_env = self.env\n", " self.env = gym.wrappers.RecordVideo(self.env, video_folder=video_folder)\n", " \n", " state, _ = self.env.reset(seed=self.seed)\n", " done = False\n", " score = 0\n", " \n", " while not done:\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", " \n", " print(\"score: \", score)\n", " self.env.close()\n", " \n", " # reset\n", " self.env = naive_env\n", "\n", " def _compute_dqn_loss(self, samples: Dict[str, np.ndarray]) -> torch.Tensor:\n", " \"\"\"Return dqn loss.\"\"\"\n", " device = self.device # for shortening the following lines\n", " state = torch.FloatTensor(samples[\"obs\"]).to(device)\n", " next_state = torch.FloatTensor(samples[\"next_obs\"]).to(device)\n", " action = torch.LongTensor(samples[\"acts\"].reshape(-1, 1)).to(device)\n", " reward = torch.FloatTensor(samples[\"rews\"].reshape(-1, 1)).to(device)\n", " done = torch.FloatTensor(samples[\"done\"].reshape(-1, 1)).to(device)\n", "\n", " # G_t = r + gamma * v(s_{t+1}) if state != Terminal\n", " # = r otherwise\n", " curr_q_value = self.dqn(state).gather(1, action)\n", " next_q_value = self.dqn_target(next_state).max(\n", " dim=1, keepdim=True\n", " )[0].detach()\n", " mask = 1 - done\n", " target = (reward + self.gamma * next_q_value * mask).to(self.device)\n", "\n", " # calculate dqn loss\n", " loss = F.smooth_l1_loss(curr_q_value, target)\n", "\n", " return loss\n", "\n", " def _target_hard_update(self):\n", " \"\"\"Hard update: target <- local.\"\"\"\n", " self.dqn_target.load_state_dict(self.dqn.state_dict())\n", " \n", " def _plot(\n", " self, \n", " frame_idx: int, \n", " scores: List[float], \n", " losses: List[float], \n", " epsilons: List[float],\n", " ):\n", " \"\"\"Plot the training progresses.\"\"\"\n", " clear_output(True)\n", " plt.figure(figsize=(20, 5))\n", " plt.subplot(131)\n", " plt.title('frame %s. score: %s' % (frame_idx, np.mean(scores[-10:])))\n", " plt.plot(scores)\n", " plt.subplot(132)\n", " plt.title('loss')\n", " plt.plot(losses)\n", " plt.subplot(133)\n", " plt.title('epsilons')\n", " plt.plot(epsilons)\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Environment\n", "\n", "You can see the [code](https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/envs/classic_control/cartpole.py) and [configurations](https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/envs/classic_control/cartpole.py#L91) of CartPole-v1 from Farama Gymnasium's repository." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# environment\n", "env = gym.make(\"CartPole-v1\", max_episode_steps=200, render_mode=\"rgb_array\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set random seed" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "seed = 777\n", "\n", "def seed_torch(seed):\n", " torch.manual_seed(seed)\n", " if torch.backends.cudnn.enabled:\n", " torch.cuda.manual_seed(seed)\n", " torch.backends.cudnn.benchmark = False\n", " torch.backends.cudnn.deterministic = True\n", "\n", "np.random.seed(seed)\n", "seed_torch(seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "cpu\n" ] } ], "source": [ "# parameters\n", "num_frames = 10000\n", "memory_size = 1000\n", "batch_size = 32\n", "target_update = 100\n", "epsilon_decay = 1 / 2000\n", "\n", "# train\n", "agent = DQNAgent(env, memory_size, batch_size, target_update, epsilon_decay, seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "agent.train(num_frames)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test\n", "\n", "Run the trained agent (1 episode)." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Moviepy - Building video /Users/jinwoo.park/Repositories/rainbow-is-all-you-need/videos/dueling/rl-video-episode-0.mp4.\n", "Moviepy - Writing video /Users/jinwoo.park/Repositories/rainbow-is-all-you-need/videos/dueling/rl-video-episode-0.mp4\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] }, { "name": "stdout", "output_type": "stream", "text": [ "Moviepy - Done !\n", "Moviepy - video ready /Users/jinwoo.park/Repositories/rainbow-is-all-you-need/videos/dueling/rl-video-episode-0.mp4\n", "score: 200.0\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r" ] } ], "source": [ "video_folder=\"videos/dueling\"\n", "agent.test(video_folder=video_folder)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Render" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Played: videos/dueling/rl-video-episode-0.mp4\n" ] } ], "source": [ "import base64\n", "import glob\n", "import io\n", "import os\n", "\n", "from IPython.display import HTML, display\n", "\n", "\n", "def ipython_show_video(path: str) -> None:\n", " \"\"\"Show a video at `path` within IPython Notebook.\"\"\"\n", " if not os.path.isfile(path):\n", " raise NameError(\"Cannot access: {}\".format(path))\n", "\n", " video = io.open(path, \"r+b\").read()\n", " encoded = base64.b64encode(video)\n", "\n", " display(HTML(\n", " data=\"\"\"\n", " \n", " \"\"\".format(encoded.decode(\"ascii\"))\n", " ))\n", "\n", "\n", "def show_latest_video(video_folder: str) -> str:\n", " \"\"\"Show the most recently recorded video from video folder.\"\"\"\n", " list_of_files = glob.glob(os.path.join(video_folder, \"*.mp4\"))\n", " latest_file = max(list_of_files, key=os.path.getctime)\n", " ipython_show_video(latest_file)\n", " return latest_file\n", "\n", "\n", "latest_file = show_latest_video(video_folder=video_folder)\n", "print(\"Played:\", latest_file)" ] } ], "metadata": { "kernelspec": { "display_name": "rainbow-is-all-you-need", "language": "python", "name": "rainbow-is-all-you-need" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.12" } }, "nbformat": 4, "nbformat_minor": 4 }