{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Configuration for Colab" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import sys\n", "IN_COLAB = \"google.colab\" in sys.modules\n", "\n", "if IN_COLAB:\n", " !apt install python-opengl\n", " !apt install ffmpeg\n", " !apt install xvfb\n", " !pip install pyvirtualdisplay\n", " from pyvirtualdisplay import Display\n", " \n", " # Start virtual display\n", " dis = Display(visible=0, size=(600, 400))\n", " dis.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 05. Soft Actor Critic (SAC)\n", "\n", "1. [T. Haarnoja et al., \"Soft Actor-Critic: Off-Policy Maximum Entropy Deep Reinforcement Learning with a Stochastic Actor.\" arXiv preprint arXiv:1801.01290, 2018.](https://arxiv.org/pdf/1801.01290.pdf)\n", "2. [T. Haarnoja et al., \"Soft Actor-Critic Algorithms and Applications.\" arXiv preprint arXiv:1812.05905, 2018.](https://arxiv.org/pdf/1812.05905.pdf)\n", "\n", "The main purpose of SAC is to maximize the actor's entropy while maximizing expected reward. We can expect both sample efficient learning and stability because maximizing entropy provides a substantial improvement in exploration and robustness.\n", "\n", "As an extension of standard RL's objective function $\\sum_t \\mathbb{E}_{(s_t, a_t) \\sim \\rho_\\pi} [r(s_t, a_t)]$, let's consider a more general maximum entropy objective which favors stochastic policies by augmenting the objective with the expected entropy of the policy over $\\rho_\\pi (s_t)$:\n", "\n", "$$J(\\pi) = \\sum_{t=0}^T \\mathbb{E}_{(s_t, a_t) \\sim \\rho_\\pi} [r(s_t, a_t) + \\alpha H(\\pi(\\cdot | s_t))].$$\n", "\n", "The temperature parameter $\\alpha$ determines the relative importance of the entropy term against the reward, and thus controls the stochasticity of the optimal policy. By this objective, the policy can explore more widely and capture multiple modes of near-optimal behavior. In conclusion, it considerably improves learning speed over other methods that optimize the conventional RL objective function.\n", "\n", "In the paper, the authors show that *Soft Policy Iteration* guarantees convergence based on a tabular setting (4.1), and they extend it to a practical approximation for large continuous domains (4.2). Firstly, the soft value function is trained to minimize the squared residual error:\n", "\n", "$$J_V (\\psi) = \\mathbb{E}_{s_t \\sim D} \\big[ \\frac{1}{2}(v_\\psi (s_t) - \\mathbb{E}_{a_t \\sim \\pi_\\phi} [Q_\\theta(s_t, a_t) - \\log_{\\pi_\\phi}(a_t | s_t)])^2 \\big],$$\n", "\n", "where $D$ is the distribution of previously sampled states and actions, or a replay buffer. Second, the soft Q-function parameters can be trained to minimize the soft Bellman residual:\n", "\n", "$$J_Q (\\theta) = \\mathbb{E}_{(s_t, a_t) \\sim D} \\big[ \\frac{1}{2} \\big( Q_\\theta(s_t, a_t) - \\hat{Q}(s_t, a_t) \\big)^2 \\big],$$\n", "\n", "with $\\hat{Q}(s_t, a_t) = r(s_t, a_t) + \\gamma \\mathbb{E}_{s_{t+1} \\sim \\rho} [V_{\\tilde{\\psi}} (s_{t+1})].$\n", "\n", "Finally, the policy paramameters can be learned by directly minimizing the following expected KL-divergence:\n", "\n", "$$J_\\pi(\\phi) = \\mathbb{E}_{s_t \\sim D} \\big[ D_{KL} \\big( \\pi_{\\phi} (\\cdot | s_t) \\| \\frac{\\exp(Q_{\\theta}(s_t, \\cdot))}{Z_\\theta(s_t)} \\big) \\big].$$\n", "\n", "We can rewirte the objective as\n", "\n", "$$J_\\pi(\\phi) = \\mathbb{E}_{s_t \\sim D, \\epsilon_t \\sim N} [ \\log_{\\pi_\\phi}(f_\\phi(\\epsilon_t ; s_t) | s_t) - Q_\\theta (s_t, f_\\phi (\\epsilon_t ; s_t))],$$\n", "\n", "where $\\pi_\\phi$ is defined implicitly in terms of $f_\\phi$, and the partition function is independent of $\\phi$ and can thus be omitted.\n", "\n", "One thing to note is that the authors suggest to use two Q-functions to mitigate positive bias in the policy improvement step that is known to degrade performance of value based methods. In particular, we parameterize two Q-functions, with parameters $\\theta_i$, and train them independently to optimize $J_Q(\\theta_i)$. We then use the minimum of the Q-functions for the value gradient and policy gradient. Two Q-functions can significantly speed up training, especially on harder tasks.\n", "\n", "### Can we do better?\n", "\n", "In Soft Actor Critic paper, the experiment of reward scale shows that SAC's performance quite varies depending on reward scaling. In the follow-up paper [2], the authors assume that the temperature parameter $\\alpha$ needs to be adjusted depending on the magnitude of the reward, and they define the soft policy optimization as a constrained problem.\n", "\n", "$$\\max_{\\pi_{0:T}} \\mathbb{E}_{\\rho_\\pi} \\big[ \\sum_{t=0}^T r(s_t, a_t) \\big] \\text{ s.t. } \\mathbb{E}_{(s_t, a_t) \\sim \\rho_\\pi} [-\\log(\\pi_t(a_t|s_t))] \\ge H \\text{ for all } t,$$\n", "\n", "where $H$ is a desired minimum expected entropy. This constrained maximization becomes the following dual problem.\n", "\n", "$$\\min_{a_T \\ge 0} \\max_{\\pi_T} \\mathbb{E} [r(s_T, a_T) - \\alpha_T \\log \\pi(a_t|s_t)] - \\alpha_T H,$$\n", "\n", "where $\\alpha_T$ is the dual variable. Furthermore, it can be rewrited as a optimization problem with regards to $\\alpha$.\n", "\n", "$$J(\\alpha) = \\mathbb{E}_{a_t \\sim \\pi_t} [-\\alpha \\log \\pi_t (a_t | s_t) - \\alpha H].$$\n", "\n", "By optimizing this dual problem, we can adjust the dual variable $\\alpha$, which plays the role of the temperature." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import modules" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import random\n", "from typing import Dict, List, Tuple\n", "\n", "import gym\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "from IPython.display import clear_output\n", "from torch.distributions import Normal" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set random seed" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "if torch.backends.cudnn.enabled:\n", " torch.backends.cudnn.benchmark = False\n", " torch.backends.cudnn.deterministic = True\n", "\n", "seed = 777\n", "torch.manual_seed(seed)\n", "np.random.seed(seed)\n", "random.seed(seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Replay buffer\n", "Typically, people implement replay buffers with one of the following three data structures:\n", "\n", "- collections.deque\n", "- list\n", "- numpy.ndarray\n", "\n", "**deque** is very easy to handle once you initialize its maximum length (e.g. deque(maxlen=buffer_size)). However, the indexing operation of deque gets terribly slow as it grows up because it is [internally doubly linked list](https://wiki.python.org/moin/TimeComplexity#collections.deque). On the other hands, **list** is an array, so it is relatively faster than deque when you sample batches at every step. Its amortized cost of Get item is [O(1)](https://wiki.python.org/moin/TimeComplexity#list).\n", "\n", "Last but not least, let's see **numpy.ndarray**. numpy.ndarray is even faster than list due to the fact that it is [a homogeneous array of fixed-size items](https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.html#numpy.ndarray), so you can get the benefits of [locality of reference](https://en.wikipedia.org/wiki/Locality_of_reference), . Whereas list is an array of pointers to objects, even when all of them are of the same type.\n", "\n", "Here, we are going to implement a replay buffer using numpy.ndarray.\n", "\n", "Reference: \n", "- [OpenAI spinning-up](https://github.com/openai/spinningup/blob/master/spinup/algos/sac/sac.py#L10)\n", "- [rainbow-is-all-you-need](https://render.githubusercontent.com/view/ipynb?commit=032d11277cf2436853478a69ca5a4aba03202598&enc_url=68747470733a2f2f7261772e67697468756275736572636f6e74656e742e636f6d2f437572742d5061726b2f7261696e626f772d69732d616c6c2d796f752d6e6565642f303332643131323737636632343336383533343738613639636135613461626130333230323539382f30312e64716e2e6970796e62&nwo=Curt-Park%2Frainbow-is-all-you-need&path=01.dqn.ipynb&repository_id=191133946&repository_type=Repository#Replay-buffer)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "class ReplayBuffer:\n", " \"\"\"A simple numpy replay buffer.\"\"\"\n", "\n", " def __init__(self, obs_dim: int, size: int, batch_size: int = 32):\n", " \"\"\"Initialize.\"\"\"\n", " self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.acts_buf = np.zeros([size], dtype=np.float32)\n", " self.rews_buf = np.zeros([size], dtype=np.float32)\n", " self.done_buf = np.zeros([size], dtype=np.float32)\n", " self.max_size, self.batch_size = size, batch_size\n", " self.ptr, self.size, = 0, 0\n", "\n", " def store(self,\n", " obs: np.ndarray,\n", " act: np.ndarray, \n", " rew: float, \n", " next_obs: np.ndarray, \n", " done: bool,\n", " ):\n", " \"\"\"Store the transition in buffer.\"\"\"\n", " self.obs_buf[self.ptr] = obs\n", " self.next_obs_buf[self.ptr] = next_obs\n", " self.acts_buf[self.ptr] = act\n", " self.rews_buf[self.ptr] = rew\n", " self.done_buf[self.ptr] = done\n", " self.ptr = (self.ptr + 1) % self.max_size\n", " self.size = min(self.size + 1, self.max_size)\n", "\n", " def sample_batch(self) -> Dict[str, np.ndarray]:\n", " \"\"\"Randomly sample a batch of experiences from memory.\"\"\"\n", " idxs = np.random.choice(self.size, size=self.batch_size, replace=False)\n", " return dict(obs=self.obs_buf[idxs],\n", " next_obs=self.next_obs_buf[idxs],\n", " acts=self.acts_buf[idxs],\n", " rews=self.rews_buf[idxs],\n", " done=self.done_buf[idxs])\n", "\n", " def __len__(self) -> int:\n", " return self.size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Network\n", "We are going to use three different networks for policy, Q-function, and V-function. We use two Q-functions to mitigate positive bias and softly update V-function for stable learning. One interesting thing is that the policy network works as Tanh Normal distribution which enforces action bounds. (The details are descibed in Appendix C of [2].)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def init_layer_uniform(layer: nn.Linear, init_w: float = 3e-3) -> nn.Linear:\n", " \"\"\"Init uniform parameters on the single layer.\"\"\"\n", " layer.weight.data.uniform_(-init_w, init_w)\n", " layer.bias.data.uniform_(-init_w, init_w)\n", "\n", " return layer\n", "\n", "\n", "class Actor(nn.Module):\n", " def __init__(\n", " self, \n", " in_dim: int, \n", " out_dim: int,\n", " log_std_min: float = -20,\n", " log_std_max: float = 2,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " super(Actor, self).__init__()\n", " \n", " # set the log std range\n", " self.log_std_min = log_std_min\n", " self.log_std_max = log_std_max\n", " \n", " # set the hidden layers\n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " \n", " # set log_std layer\n", " self.log_std_layer = nn.Linear(128, out_dim)\n", " self.log_std_layer = init_layer_uniform(self.log_std_layer)\n", "\n", " # set mean layer\n", " self.mu_layer = nn.Linear(128, out_dim)\n", " self.mu_layer = init_layer_uniform(self.mu_layer)\n", "\n", " def forward(self, state: torch.Tensor) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = F.relu(self.hidden1(state))\n", " x = F.relu(self.hidden2(x))\n", " \n", " # get mean\n", " mu = self.mu_layer(x).tanh()\n", " \n", " # get std\n", " log_std = self.log_std_layer(x).tanh()\n", " log_std = self.log_std_min + 0.5 * (\n", " self.log_std_max - self.log_std_min\n", " ) * (log_std + 1)\n", " std = torch.exp(log_std)\n", " \n", " # sample actions\n", " dist = Normal(mu, std)\n", " z = dist.rsample()\n", " \n", " # normalize action and log_prob\n", " # see appendix C of [2]\n", " action = z.tanh()\n", " log_prob = dist.log_prob(z) - torch.log(1 - action.pow(2) + 1e-7)\n", " log_prob = log_prob.sum(-1, keepdim=True)\n", " \n", " return action, log_prob\n", " \n", " \n", "class CriticQ(nn.Module):\n", " def __init__(self, in_dim: int):\n", " \"\"\"Initialize.\"\"\"\n", " super(CriticQ, self).__init__()\n", " \n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " self.out = nn.Linear(128, 1)\n", " self.out = init_layer_uniform(self.out)\n", "\n", " def forward(\n", " self, state: torch.Tensor, action: torch.Tensor\n", " ) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = torch.cat((state, action), dim=-1)\n", " x = F.relu(self.hidden1(x))\n", " x = F.relu(self.hidden2(x))\n", " value = self.out(x)\n", " \n", " return value\n", " \n", " \n", "class CriticV(nn.Module):\n", " def __init__(self, in_dim: int):\n", " \"\"\"Initialize.\"\"\"\n", " super(CriticV, self).__init__()\n", " \n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " self.out = nn.Linear(128, 1)\n", " self.out = init_layer_uniform(self.out)\n", "\n", " def forward(self, state: torch.Tensor) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = F.relu(self.hidden1(state))\n", " x = F.relu(self.hidden2(x))\n", " value = self.out(x)\n", " \n", " return value" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SAC Agent\n", "Here is a summary of SACAgent class.\n", "\n", "| Method | Note |\n", "|--- |--- |\n", "|select_action | select an action from the input state. |\n", "|step | take an action and return the response of the env. |\n", "|update_model | update the model by gradient descent. |\n", "|train | train the agent during num_frames. |\n", "|test | test the agent (1 episode). |\n", "|\\_target_soft_update| soft update from the local model to the target model.|\n", "|\\_plot | plot the training progresses. |" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "class SACAgent:\n", " \"\"\"SAC agent interacting with environment.\n", " \n", " Attrtibutes:\n", " actor (nn.Module): actor model to select actions\n", " actor_optimizer (Optimizer): optimizer for training actor\n", " vf (nn.Module): critic model to predict state values\n", " vf_target (nn.Module): target critic model to predict state values\n", " vf_optimizer (Optimizer): optimizer for training vf\n", " qf_1 (nn.Module): critic model to predict state-action values\n", " qf_2 (nn.Module): critic model to predict state-action values\n", " qf_1_optimizer (Optimizer): optimizer for training qf_1\n", " qf_2_optimizer (Optimizer): optimizer for training qf_2\n", " env (gym.Env): openAI Gym environment\n", " memory (ReplayBuffer): replay memory\n", " batch_size (int): batch size for sampling\n", " gamma (float): discount factor\n", " tau (float): parameter for soft target update\n", " initial_random_steps (int): initial random action steps\n", " policy_update_freq (int): policy update frequency\n", " device (torch.device): cpu / gpu\n", " target_entropy (int): desired entropy used for the inequality constraint\n", " log_alpha (torch.Tensor): weight for entropy\n", " alpha_optimizer (Optimizer): optimizer for alpha\n", " transition (list): temporory storage for the recent transition\n", " total_step (int): total step numbers\n", " is_test (bool): flag to show the current mode (train / test)\n", " \"\"\"\n", " \n", " def __init__(\n", " self,\n", " env: gym.Env,\n", " memory_size: int,\n", " batch_size: int,\n", " gamma: float = 0.99,\n", " tau: float = 5e-3,\n", " initial_random_steps: int = int(1e4),\n", " policy_update_freq: int = 2,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " obs_dim = env.observation_space.shape[0]\n", " action_dim = env.action_space.shape[0]\n", "\n", " self.env = env\n", " self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)\n", " self.batch_size = batch_size\n", " self.gamma = gamma\n", " self.tau = tau\n", " self.initial_random_steps = initial_random_steps\n", " self.policy_update_freq = policy_update_freq\n", "\n", " # device: cpu / gpu\n", " self.device = torch.device(\n", " \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", " )\n", " print(self.device)\n", " \n", " # automatic entropy tuning\n", " self.target_entropy = -np.prod((action_dim,)).item() # heuristic\n", " self.log_alpha = torch.zeros(1, requires_grad=True, device=self.device)\n", " self.alpha_optimizer = optim.Adam([self.log_alpha], lr=3e-4)\n", "\n", " # actor\n", " self.actor = Actor(obs_dim, action_dim).to(self.device)\n", " \n", " # v function\n", " self.vf = CriticV(obs_dim).to(self.device)\n", " self.vf_target = CriticV(obs_dim).to(self.device)\n", " self.vf_target.load_state_dict(self.vf.state_dict())\n", " \n", " # q function\n", " self.qf_1 = CriticQ(obs_dim + action_dim).to(self.device)\n", " self.qf_2 = CriticQ(obs_dim + action_dim).to(self.device)\n", "\n", " # optimizers\n", " self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)\n", " self.vf_optimizer = optim.Adam(self.vf.parameters(), lr=3e-4)\n", " self.qf_1_optimizer = optim.Adam(self.qf_1.parameters(), lr=3e-4)\n", " self.qf_2_optimizer = optim.Adam(self.qf_2.parameters(), lr=3e-4)\n", " \n", " # transition to store in memory\n", " self.transition = list()\n", " \n", " # total steps count\n", " self.total_step = 0\n", "\n", " # mode: train / test\n", " self.is_test = False\n", " \n", " def select_action(self, state: np.ndarray) -> np.ndarray:\n", " \"\"\"Select an action from the input state.\"\"\"\n", " # if initial random action should be conducted\n", " if self.total_step < self.initial_random_steps and not self.is_test:\n", " selected_action = self.env.action_space.sample()\n", " else:\n", " selected_action = self.actor(\n", " torch.FloatTensor(state).to(self.device)\n", " )[0].detach().cpu().numpy()\n", " \n", " self.transition = [state, selected_action]\n", " \n", " return selected_action\n", " \n", " def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool]:\n", " \"\"\"Take an action and return the response of the env.\"\"\"\n", " next_state, reward, done, _ = self.env.step(action)\n", " \n", " if not self.is_test:\n", " self.transition += [reward, next_state, done]\n", " self.memory.store(*self.transition)\n", " \n", " return next_state, reward, done\n", " \n", " def update_model(self) -> Tuple[torch.Tensor, ...]:\n", " \"\"\"Update the model by gradient descent.\"\"\"\n", " device = self.device # for shortening the following lines\n", " \n", " samples = self.memory.sample_batch()\n", " state = torch.FloatTensor(samples[\"obs\"]).to(device)\n", " next_state = torch.FloatTensor(samples[\"next_obs\"]).to(device)\n", " action = torch.FloatTensor(samples[\"acts\"].reshape(-1, 1)).to(device)\n", " reward = torch.FloatTensor(samples[\"rews\"].reshape(-1, 1)).to(device)\n", " done = torch.FloatTensor(samples[\"done\"].reshape(-1, 1)).to(device)\n", " new_action, log_prob = self.actor(state)\n", " \n", " # train alpha (dual problem)\n", " alpha_loss = (\n", " -self.log_alpha.exp() * (log_prob + self.target_entropy).detach()\n", " ).mean()\n", "\n", " self.alpha_optimizer.zero_grad()\n", " alpha_loss.backward()\n", " self.alpha_optimizer.step()\n", " \n", " alpha = self.log_alpha.exp() # used for the actor loss calculation\n", " \n", " # q function loss\n", " mask = 1 - done\n", " q_1_pred = self.qf_1(state, action)\n", " q_2_pred = self.qf_2(state, action)\n", " v_target = self.vf_target(next_state)\n", " q_target = reward + self.gamma * v_target * mask\n", " qf_1_loss = F.mse_loss(q_1_pred, q_target.detach())\n", " qf_2_loss = F.mse_loss(q_2_pred, q_target.detach())\n", " \n", " # v function loss\n", " v_pred = self.vf(state)\n", " q_pred = torch.min(\n", " self.qf_1(state, new_action), self.qf_2(state, new_action)\n", " )\n", " v_target = q_pred - alpha * log_prob\n", " vf_loss = F.mse_loss(v_pred, v_target.detach())\n", " \n", " if self.total_step % self.policy_update_freq == 0:\n", " # actor loss\n", " advantage = q_pred - v_pred.detach()\n", " actor_loss = (alpha * log_prob - advantage).mean()\n", " \n", " # train actor\n", " self.actor_optimizer.zero_grad()\n", " actor_loss.backward()\n", " self.actor_optimizer.step()\n", " \n", " # target update (vf)\n", " self._target_soft_update()\n", " else:\n", " actor_loss = torch.zeros(1)\n", " \n", " # train Q functions\n", " self.qf_1_optimizer.zero_grad()\n", " qf_1_loss.backward()\n", " self.qf_1_optimizer.step()\n", "\n", " self.qf_2_optimizer.zero_grad()\n", " qf_2_loss.backward()\n", " self.qf_2_optimizer.step()\n", " \n", " qf_loss = qf_1_loss + qf_2_loss\n", "\n", " # train V function\n", " self.vf_optimizer.zero_grad()\n", " vf_loss.backward()\n", " self.vf_optimizer.step()\n", " \n", " return actor_loss.data, qf_loss.data, vf_loss.data, alpha_loss.data\n", " \n", " def train(self, num_frames: int, plotting_interval: int = 200):\n", " \"\"\"Train the agent.\"\"\"\n", " self.is_test = False\n", " \n", " state = self.env.reset()\n", " actor_losses, qf_losses, vf_losses, alpha_losses = [], [], [], []\n", " scores = []\n", " score = 0\n", " \n", " for self.total_step in range(1, num_frames + 1):\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", "\n", " # if episode ends\n", " if done:\n", " state = env.reset()\n", " scores.append(score)\n", " score = 0\n", "\n", " # if training is ready\n", " if (\n", " len(self.memory) >= self.batch_size \n", " and self.total_step > self.initial_random_steps\n", " ):\n", " losses = self.update_model()\n", " actor_losses.append(losses[0])\n", " qf_losses.append(losses[1])\n", " vf_losses.append(losses[2])\n", " alpha_losses.append(losses[3])\n", " \n", " # plotting\n", " if self.total_step % plotting_interval == 0:\n", " self._plot(\n", " self.total_step,\n", " scores, \n", " actor_losses, \n", " qf_losses, \n", " vf_losses, \n", " alpha_losses\n", " )\n", " \n", " self.env.close()\n", " \n", " def test(self):\n", " \"\"\"Test the agent.\"\"\"\n", " self.is_test = True\n", " \n", " state = self.env.reset()\n", " done = False\n", " score = 0\n", " \n", " frames = []\n", " while not done:\n", " frames.append(self.env.render(mode=\"rgb_array\"))\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", " \n", " print(\"score: \", score)\n", " self.env.close()\n", " \n", " return frames\n", " \n", " def _target_soft_update(self):\n", " \"\"\"Soft-update: target = tau*local + (1-tau)*target.\"\"\"\n", " tau = self.tau\n", " \n", " for t_param, l_param in zip(\n", " self.vf_target.parameters(), self.vf.parameters()\n", " ):\n", " t_param.data.copy_(tau * l_param.data + (1.0 - tau) * t_param.data)\n", " \n", " def _plot(\n", " self, \n", " frame_idx: int, \n", " scores: List[float], \n", " actor_losses: List[float],\n", " qf_losses: List[float],\n", " vf_losses: List[float],\n", " alpha_losses: List[float],\n", " ):\n", " \"\"\"Plot the training progresses.\"\"\"\n", " def subplot(loc: int, title: str, values: List[float]):\n", " plt.subplot(loc)\n", " plt.title(title)\n", " plt.plot(values)\n", "\n", " subplot_params = [\n", " (151, f\"frame {frame_idx}. score: {np.mean(scores[-10:])}\", scores),\n", " (152, \"actor_loss\", actor_losses),\n", " (153, \"qf_loss\", qf_losses),\n", " (154, \"vf_loss\", vf_losses),\n", " (155, \"alpha_loss\", alpha_losses),\n", " ]\n", " \n", " clear_output(True)\n", " plt.figure(figsize=(30, 5))\n", " for loc, title, values in subplot_params:\n", " subplot(loc, title, values)\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Environment\n", "*ActionNormalizer* is an action wrapper class to normalize the action values ranged in (-1. 1). Thanks to this class, we can make the agent simply select action values within the zero centered range (-1, 1)." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "class ActionNormalizer(gym.ActionWrapper):\n", " \"\"\"Rescale and relocate the actions.\"\"\"\n", "\n", " def action(self, action: np.ndarray) -> np.ndarray:\n", " \"\"\"Change the range (-1, 1) to (low, high).\"\"\"\n", " low = self.action_space.low\n", " high = self.action_space.high\n", "\n", " scale_factor = (high - low) / 2\n", " reloc_factor = high - scale_factor\n", "\n", " action = action * scale_factor + reloc_factor\n", " action = np.clip(action, low, high)\n", "\n", " return action\n", "\n", " def reverse_action(self, action: np.ndarray) -> np.ndarray:\n", " \"\"\"Change the range (low, high) to (-1, 1).\"\"\"\n", " low = self.action_space.low\n", " high = self.action_space.high\n", "\n", " scale_factor = (high - low) / 2\n", " reloc_factor = high - scale_factor\n", "\n", " action = (action - reloc_factor) / scale_factor\n", " action = np.clip(action, -1.0, 1.0)\n", "\n", " return action" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can see [the code](https://github.com/openai/gym/blob/master/gym/envs/classic_control/pendulum.py) and [configurations](https://github.com/openai/gym/blob/cedecb35e3428985fd4efad738befeb75b9077f1/gym/envs/__init__.py#L81) of Pendulum-v0 from OpenAI's repository." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# environment\n", "env_id = \"Pendulum-v0\"\n", "env = gym.make(env_id)\n", "env = ActionNormalizer(env)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "cuda\n" ] } ], "source": [ "# parameters\n", "num_frames = 50000\n", "memory_size = 100000\n", "batch_size = 128\n", "initial_random_steps = 10000\n", "\n", "agent = SACAgent(\n", " env, memory_size, batch_size, initial_random_steps=initial_random_steps\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "agent.train(num_frames)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test\n", "Run the trained agent (1 episode)." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "score: -231.72002778923985\n" ] } ], "source": [ "# test\n", "if IN_COLAB:\n", " agent.env = gym.wrappers.Monitor(agent.env, \"videos\", force=True)\n", "frames = agent.test()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Render" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", " \n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Once \n", " Loop \n", " Reflect \n", "
\n", "
\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "if IN_COLAB: # for colab\n", " import base64\n", " import glob\n", " import io\n", " import os\n", "\n", " from IPython.display import HTML, display\n", "\n", " def ipython_show_video(path: str) -> None:\n", " \"\"\"Show a video at `path` within IPython Notebook.\"\"\"\n", " if not os.path.isfile(path):\n", " raise NameError(\"Cannot access: {}\".format(path))\n", "\n", " video = io.open(path, \"r+b\").read()\n", " encoded = base64.b64encode(video)\n", "\n", " display(HTML(\n", " data=\"\"\"\n", " \n", " \"\"\".format(encoded.decode(\"ascii\"))\n", " ))\n", "\n", " list_of_files = glob.glob(\"videos/*.mp4\")\n", " latest_file = max(list_of_files, key=os.path.getctime)\n", " print(latest_file)\n", " ipython_show_video(latest_file)\n", "\n", "else: # for jupyter\n", " from matplotlib import animation\n", " from JSAnimation.IPython_display import display_animation\n", " from IPython.display import display\n", "\n", "\n", " def display_frames_as_gif(frames):\n", " \"\"\"Displays a list of frames as a gif, with controls.\"\"\"\n", " patch = plt.imshow(frames[0])\n", " plt.axis('off')\n", "\n", " def animate(i):\n", " patch.set_data(frames[i])\n", "\n", " anim = animation.FuncAnimation(\n", " plt.gcf(), animate, frames = len(frames), interval=50\n", " )\n", " display(display_animation(anim, default_mode='loop'))\n", "\n", "\n", " # display \n", " display_frames_as_gif(frames)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "pgayn", "language": "python", "name": "pgayn" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 4 }