{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Configuration for Colab" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import sys\n", "IN_COLAB = \"google.colab\" in sys.modules\n", "\n", "if IN_COLAB:\n", " !apt install python-opengl\n", " !apt install ffmpeg\n", " !apt install xvfb\n", " !pip install pyvirtualdisplay\n", " from pyvirtualdisplay import Display\n", " \n", " # Start virtual display\n", " dis = Display(visible=0, size=(600, 400))\n", " dis.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 04. TD3\n", "\n", "[Fujimoto, Scott, Herke van Hoof, and David Meger. \"Addressing function approximation error in actor-critic methods.\" arXiv preprint arXiv:1802.09477 2018.](https://arxiv.org/pdf/1802.09477.pdf)\n", "\n", "In value-based reinforcement learning methods, function approximation errors are known to lead to overestimated value estimates and suboptimal policies. However, similar issues with actor-critic methods in continuous control domains have been largely left untouched (See paper for detailed description). To solve this problem, this paper proposes *a clipped Double Q-learning*. In addtion, this paper contains a number of components that address variance reduction.\n", "\n", "The author's modifications are applied to actor-critic method for continuous control, Deep Deterministic Policy Gradient algorithm ([DDPG](https://arxiv.org/pdf/1509.02971.pdf)), to form the *Twin Delayed Deep Deterministic policy gradient algorithm (TD3)*.\n", "\n", "### DDPG\n", "For learning in high-dimentional and continous action spaces, the authors of DDPG combine the actor-critic approach with insights from the success of DQN. Deep DPG(DDPG) is based on the deterministic policy gradient(DPG) algorithm ([Silver et al., 2014](http://proceedings.mlr.press/v32/silver14.pdf)). Please see *03.DDPG.ipynb* for detailed description of DDPG.\n", "\n", "### Double Q-learning\n", "In Double DQN ([Van Hasselt et al., 2016](https://arxiv.org/pdf/1509.06461.pdf)), the authors propose using the target network as one of the value estimates, and obtain a policy by greedy maximization of the current value network rather than the target network. In an actor-critic setting, an analogous update uses the current policy rather than the target policy in the learning target. However, with the slow-changing policy in actor-critic, the current and target networks were too similar to make an independent estimation, and offered little improvement. Instead, the original Double Q-learning formulation can be used, with a pair of actors $(\\pi_{\\phi_1}, \\pi_{\\phi_2})$ and critics $(Q_{\\theta_1}, Q_{\\theta_2})$, where $\\pi_{\\phi_1}$ is optimized with respect to $Q_{\\theta_1}$ and $\\pi_{\\phi_2}$ with respect to $Q_{\\theta_2}$:\n", "\n", "$$\n", "y_1 = r + \\gamma Q_{\\theta'_2} (s' , \\pi_{\\phi_1}(s')) \\\\\n", "y_2 = r + \\gamma Q_{\\theta'_1} (s' , \\pi_{\\phi_2}(s'))\n", "$$\n", "\n", "### A clipped Double Q-learning\n", "The critics are not entirely independent, due to the use of the opposite critic in the learning targets, as well as the same replay buffer. As a result, for some states we will have $Q_{\\theta'_2}(s, \\pi_{\\phi_1}) > Q_{\\theta'_1}(s, \\pi_{\\phi_1})$. This is problematic because $Q_{\\theta'_1}(s, \\pi_{\\phi_1})$ will generally overestimate the true value, and in certain areas of the state space the overestimation will be further exaggerated. To address this problem, the authors propose to take the minimum between the two estimates:\n", "\n", "$$\n", "y_1 = r + \\gamma \\underset{i=1,2}{\\min} Q_{\\theta'_i} (s' , \\pi_{\\phi_1}(s'))\n", "$$\n", "\n", "### Delayed Policy Updates\n", "If policy updates on high-error states cause different behavior, then the policy network should be updated at a lower frequency than the value network, to first minimize error before introducing a policy update. The authors propose delaying policy updates until the value error is as small as possible.\n", "\n", "### Target Policy Smoothing Regularization\n", "When updating the critic, a learning target using a deterministic policy is highly susceptible to in accuracies induced by function approximation error, increasing the variance of the target. This induced variance can be reduced through regularization. The authors propose that fitting the value of a small area around the target action\n", "\n", "$$\n", "y = r + E_\\epsilon [Q_{\\theta'}(s', \\pi_{\\phi '}(s') + \\epsilon],\n", "$$\n", "\n", "would have the benefit of smoothing the value estimate by bootstrapping off of similar state-action value estimates. In practice, this makes below:\n", "\n", "$$\n", "y = r + \\gamma Q_{\\theta '}(s', \\pi_{\\phi '}(s') + \\epsilon), \\\\\n", "\\epsilon \\sim \\text{clip} (\\mathcal(N)(0, \\sigma), -c, c),\n", "$$\n", "\n", "where the added noise is clipped to keep the target close tothe original action." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## import module" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import copy\n", "import os\n", "import random\n", "from typing import Dict, List, Tuple\n", "\n", "import gym\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "from IPython.display import clear_output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set random seed" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "if torch.backends.cudnn.enabled:\n", " torch.backends.cudnn.benchmark = False\n", " torch.backends.cudnn.deterministic = True\n", "\n", "seed = 777\n", "torch.manual_seed(seed)\n", "np.random.seed(seed)\n", "random.seed(seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Replay buffer\n", "Typically, people implement replay buffers with one of the following three data structures:\n", "\n", "- collections.deque\n", "- list\n", "- numpy.ndarray\n", "\n", "**deque** is very easy to handle once you initialize its maximum length (e.g. deque(maxlen=buffer_size)). However, the indexing operation of deque gets terribly slow as it grows up because it is [internally doubly linked list](https://wiki.python.org/moin/TimeComplexity#collections.deque). On the other hands, **list** is an array, so it is relatively faster than deque when you sample batches at every step. Its amortized cost of Get item is [O(1)](https://wiki.python.org/moin/TimeComplexity#list).\n", "\n", "Last but not least, let's see **numpy.ndarray**. numpy.ndarray is even faster than list due to the fact that it is [a homogeneous array of fixed-size items](https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.html#numpy.ndarray), so you can get the benefits of [locality of reference](https://en.wikipedia.org/wiki/Locality_of_reference), . Whereas list is an array of pointers to objects, even when all of them are of the same type.\n", "\n", "Here, we are going to implement a replay buffer using numpy.ndarray.\n", "\n", "Reference: \n", "- [OpenAI spinning-up](https://github.com/openai/spinningup/blob/master/spinup/algos/sac/sac.py#L10)\n", "- [rainbow-is-all-you-need](https://render.githubusercontent.com/view/ipynb?commit=032d11277cf2436853478a69ca5a4aba03202598&enc_url=68747470733a2f2f7261772e67697468756275736572636f6e74656e742e636f6d2f437572742d5061726b2f7261696e626f772d69732d616c6c2d796f752d6e6565642f303332643131323737636632343336383533343738613639636135613461626130333230323539382f30312e64716e2e6970796e62&nwo=Curt-Park%2Frainbow-is-all-you-need&path=01.dqn.ipynb&repository_id=191133946&repository_type=Repository#Replay-buffer)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "class ReplayBuffer:\n", " \"\"\"A simple numpy replay buffer.\"\"\"\n", "\n", " def __init__(self, obs_dim: int, size: int, batch_size: int = 32):\n", " self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.acts_buf = np.zeros([size], dtype=np.float32)\n", " self.rews_buf = np.zeros([size], dtype=np.float32)\n", " self.done_buf = np.zeros(size, dtype=np.float32)\n", " self.max_size, self.batch_size = size, batch_size\n", " self.ptr, self.size, = 0, 0\n", "\n", " def store(\n", " self,\n", " obs: np.ndarray,\n", " act: np.ndarray,\n", " rew: float,\n", " next_obs: np.ndarray,\n", " done: bool,\n", " ):\n", " self.obs_buf[self.ptr] = obs\n", " self.next_obs_buf[self.ptr] = next_obs\n", " self.acts_buf[self.ptr] = act\n", " self.rews_buf[self.ptr] = rew\n", " self.done_buf[self.ptr] = done\n", " self.ptr = (self.ptr + 1) % self.max_size\n", " self.size = min(self.size + 1, self.max_size)\n", "\n", " def sample_batch(self) -> Dict[str, np.ndarray]:\n", " idxs = np.random.choice(self.size, size=self.batch_size, replace=False)\n", " return dict(\n", " obs=self.obs_buf[idxs],\n", " next_obs=self.next_obs_buf[idxs],\n", " acts=self.acts_buf[idxs],\n", " rews=self.rews_buf[idxs],\n", " done=self.done_buf[idxs],\n", " )\n", "\n", " def __len__(self) -> int:\n", " return self.size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gaussian Noise\n", "Because the DDPG and the TD3 policy is deterministic, it's not enough to explore a wide variety of actions. In order to facilitate more exploration. TD3 adds Gaussian noise to each action, while DDPG uses Ornstein-Uhlenbeck noise. The TD3 paper states Ornstein-Uhlenbeck noise offered no performance benefits." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "class GaussianNoise:\n", " \"\"\"Gaussian Noise.\n", " Taken from https://github.com/vitchyr/rlkit\n", " \"\"\"\n", "\n", " def __init__(\n", " self,\n", " action_dim: int,\n", " min_sigma: float = 1.0,\n", " max_sigma: float = 1.0,\n", " decay_period: int = 1000000,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " self.action_dim = action_dim\n", " self.max_sigma = max_sigma\n", " self.min_sigma = min_sigma\n", " self.decay_period = decay_period\n", "\n", " def sample(self, t: int = 0) -> float:\n", " \"\"\"Get an action with gaussian noise.\"\"\"\n", " sigma = self.max_sigma - (self.max_sigma - self.min_sigma) * min(\n", " 1.0, t / self.decay_period\n", " )\n", " return np.random.normal(0, sigma, size=self.action_dim)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Network\n", "We are going to use two separated networks for actor and critic. The actor network has three fully connected layers and three non-linearity functions, *ReLU* for hidden layers and *tanh* for the output layer. On the other hand, the critic network has three fully connected layers, but it used two activation functions for hidden layers *ReLU*. Plus, its input sizes of critic network are sum of state sizes and action sizes. One thing to note is that we initialize the final layer's weights and biases so that they are *uniformly distributed.*" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "class Actor(nn.Module):\n", " def __init__(self, in_dim: int, out_dim: int, init_w: float = 3e-3):\n", " \"\"\"Initialize.\"\"\"\n", " super(Actor, self).__init__()\n", "\n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " self.out = nn.Linear(128, out_dim)\n", "\n", " self.out.weight.data.uniform_(-init_w, init_w)\n", " self.out.bias.data.uniform_(-init_w, init_w)\n", "\n", " def forward(self, state: torch.Tensor) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = F.relu(self.hidden1(state))\n", " x = F.relu(self.hidden2(x))\n", " action = self.out(x).tanh()\n", "\n", " return action\n", "\n", "\n", "class Critic(nn.Module):\n", " def __init__(self, in_dim: int, init_w: float = 3e-3):\n", " \"\"\"Initialize.\"\"\"\n", " super(Critic, self).__init__()\n", "\n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " self.out = nn.Linear(128, 1)\n", "\n", " self.out.weight.data.uniform_(-init_w, init_w)\n", " self.out.bias.data.uniform_(-init_w, init_w)\n", "\n", " def forward(self, state: torch.Tensor, action: torch.Tensor) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = torch.cat((state, action), dim=-1)\n", " x = F.relu(self.hidden1(x))\n", " x = F.relu(self.hidden2(x))\n", " value = self.out(x)\n", "\n", " return value" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TD3Agent\n", "Here is a summary of TD3Agent class.\n", "\n", "| Method | Note |\n", "|--- |--- |\n", "|select_action | select an action from the input state. |\n", "|step | take an action and return the response of the env. |\n", "|update_model | update the model by gradient descent. |\n", "|train | train the agent during num_frames. |\n", "|test | test the agent (1 episode). |\n", "|\\_target_soft_update| soft update from the local model to the target model.|\n", "|\\_plot | plot the training progresses. |" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "class TD3Agent:\n", " \"\"\"TD3Agent interacting with environment.\n", " \n", " Attribute:\n", " env (gym.Env): openAI Gym environment\n", " actor1 (nn.Module): target actor model to select actions\n", " actor2 (nn.Module): target actor model to select actions\n", " actor_target1 (nn.Module): actor model to predict next actions\n", " actor_target2 (nn.Module): actor model to predict next actions\n", " actor_optimizer (Optimizer): optimizer for training actor\n", " critic1 (nn.Module): critic model to predict state values\n", " critic2 (nn.Module): critic model to predict state values\n", " critic_target1 (nn.Module): target critic model to predict state values\n", " critic_target2 (nn.Module): target critic model to predict state values \n", " critic_optimizer (Optimizer): optimizer for training critic\n", " memory (ReplayBuffer): replay memory to store transitions\n", " batch_size (int): batch size for sampling\n", " gamma (float): discount factor\n", " tau (float): parameter for soft target update\n", " initial_random_steps (int): initial random action steps\n", " exploration_noise (GaussianNoise): gaussian noise for policy\n", " target_policy_noise (GaussianNoise): gaussian noise for target policy\n", " target_policy_noise_clip (float): clip target gaussian noise\n", " device (torch.device): cpu / gpu\n", " transition (list): temporory storage for the recent transition\n", " policy_update_freq (int): update actor every time critic updates this times\n", " total_step (int): total step numbers\n", " is_test (bool): flag to show the current mode (train / test)\n", " \"\"\"\n", "\n", " def __init__(\n", " self,\n", " env: gym.Env,\n", " memory_size: int,\n", " batch_size: int,\n", " gamma: float = 0.99,\n", " tau: float = 5e-3,\n", " exploration_noise: float = 0.1,\n", " target_policy_noise: float = 0.2,\n", " target_policy_noise_clip: float = 0.5,\n", " initial_random_steps: int = int(1e4),\n", " policy_update_freq: int = 2,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " obs_dim = env.observation_space.shape[0]\n", " action_dim = env.action_space.shape[0]\n", "\n", " self.env = env\n", " self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)\n", " self.batch_size = batch_size\n", " self.gamma = gamma\n", " self.tau = tau\n", " self.initial_random_steps = initial_random_steps\n", " self.policy_update_freq = policy_update_freq\n", "\n", " # device: cpu / gpu\n", " self.device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", " print(self.device)\n", "\n", " # noise\n", " self.exploration_noise = GaussianNoise(\n", " action_dim, exploration_noise, exploration_noise\n", " )\n", " self.target_policy_noise = GaussianNoise(\n", " action_dim, target_policy_noise, target_policy_noise\n", " )\n", " self.target_policy_noise_clip = target_policy_noise_clip\n", "\n", " # networks\n", " self.actor = Actor(obs_dim, action_dim).to(self.device)\n", " self.actor_target = Actor(obs_dim, action_dim).to(self.device)\n", " self.actor_target.load_state_dict(self.actor.state_dict())\n", "\n", " self.critic1 = Critic(obs_dim + action_dim).to(self.device)\n", " self.critic_target1 = Critic(obs_dim + action_dim).to(self.device)\n", " self.critic_target1.load_state_dict(self.critic1.state_dict())\n", "\n", " self.critic2 = Critic(obs_dim + action_dim).to(self.device)\n", " self.critic_target2 = Critic(obs_dim + action_dim).to(self.device)\n", " self.critic_target2.load_state_dict(self.critic2.state_dict())\n", "\n", " # concat critic parameters to use one optim\n", " critic_parameters = list(self.critic1.parameters()) + list(\n", " self.critic2.parameters()\n", " )\n", "\n", " # optimizer\n", " self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)\n", " self.critic_optimizer = optim.Adam(critic_parameters, lr=1e-3)\n", "\n", " # transition to store in memory\n", " self.transition = list()\n", "\n", " # total steps count\n", " self.total_step = 0\n", "\n", " # update step for actor\n", " self.update_step = 0\n", "\n", " # mode: train / test\n", " self.is_test = False\n", "\n", " def select_action(self, state: np.ndarray) -> np.ndarray:\n", " \"\"\"Select an action from the input state.\"\"\"\n", " # if initial random action should be conducted\n", " if self.total_step < self.initial_random_steps and not self.is_test:\n", " selected_action = self.env.action_space.sample()\n", " else:\n", " selected_action = (\n", " self.actor(torch.FloatTensor(state).to(self.device))[0]\n", " .detach()\n", " .cpu()\n", " .numpy()\n", " )\n", "\n", " # add noise for exploration during training\n", " if not self.is_test:\n", " noise = self.exploration_noise.sample()\n", " selected_action = np.clip(\n", " selected_action + noise, -1.0, 1.0\n", " )\n", "\n", " self.transition = [state, selected_action]\n", "\n", " return selected_action\n", "\n", " def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:\n", " \"\"\"Take an action and return the response of the env.\"\"\"\n", " next_state, reward, done, _ = self.env.step(action)\n", "\n", " if not self.is_test:\n", " self.transition += [reward, next_state, done]\n", " self.memory.store(*self.transition)\n", "\n", " return next_state, reward, done\n", "\n", " def update_model(self) -> torch.Tensor:\n", " \"\"\"Update the model by gradient descent.\"\"\"\n", " device = self.device # for shortening the following lines\n", "\n", " samples = self.memory.sample_batch()\n", " states = torch.FloatTensor(samples[\"obs\"]).to(device)\n", " next_states = torch.FloatTensor(samples[\"next_obs\"]).to(device)\n", " actions = torch.FloatTensor(samples[\"acts\"].reshape(-1, 1)).to(device)\n", " rewards = torch.FloatTensor(samples[\"rews\"].reshape(-1, 1)).to(device)\n", " dones = torch.FloatTensor(samples[\"done\"].reshape(-1, 1)).to(device)\n", " masks = 1 - dones\n", "\n", " # get actions with noise\n", " noise = torch.FloatTensor(self.target_policy_noise.sample()).to(device)\n", " clipped_noise = torch.clamp(\n", " noise, -self.target_policy_noise_clip, self.target_policy_noise_clip\n", " )\n", "\n", " next_actions = (self.actor_target(next_states) + clipped_noise).clamp(\n", " -1.0, 1.0\n", " )\n", "\n", " # min (Q_1', Q_2')\n", " next_values1 = self.critic_target1(next_states, next_actions)\n", " next_values2 = self.critic_target2(next_states, next_actions)\n", " next_values = torch.min(next_values1, next_values2)\n", "\n", " # G_t = r + gamma * v(s_{t+1}) if state != Terminal\n", " # = r otherwise\n", " curr_returns = rewards + self.gamma * next_values * masks\n", " curr_returns = curr_returns.detach()\n", "\n", " # critic loss\n", " values1 = self.critic1(states, actions)\n", " values2 = self.critic2(states, actions)\n", " critic1_loss = F.mse_loss(values1, curr_returns)\n", " critic2_loss = F.mse_loss(values2, curr_returns)\n", "\n", " # train critic\n", " critic_loss = critic1_loss + critic2_loss\n", " self.critic_optimizer.zero_grad()\n", " critic_loss.backward()\n", " self.critic_optimizer.step()\n", "\n", " if self.total_step % self.policy_update_freq == 0:\n", " # train actor\n", " actor_loss = -self.critic1(states, self.actor(states)).mean()\n", "\n", " self.actor_optimizer.zero_grad()\n", " actor_loss.backward()\n", " self.actor_optimizer.step()\n", "\n", " # target update\n", " self._target_soft_update()\n", " else:\n", " actor_loss = torch.zeros(1)\n", "\n", " return actor_loss.data, critic_loss.data\n", "\n", " def train(self, num_frames: int, plotting_interval: int = 200):\n", " \"\"\"Train the agent.\"\"\"\n", " self.is_test = False\n", "\n", " state = self.env.reset()\n", " actor_losses = []\n", " critic_losses = []\n", " scores = []\n", " score = 0\n", "\n", " for self.total_step in range(1, num_frames + 1):\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", "\n", " # if episode ends\n", " if done:\n", " state = env.reset()\n", " scores.append(score)\n", " score = 0\n", "\n", " # if training is ready\n", " if (\n", " len(self.memory) >= self.batch_size\n", " and self.total_step > self.initial_random_steps\n", " ):\n", " actor_loss, critic_loss = self.update_model()\n", " actor_losses.append(actor_loss)\n", " critic_losses.append(critic_loss)\n", "\n", " # plotting\n", " if self.total_step % plotting_interval == 0:\n", " self._plot(self.total_step, scores, actor_losses, critic_losses)\n", "\n", " self.env.close()\n", "\n", " def test(self):\n", " \"\"\"Test the agent.\"\"\"\n", " self.is_test = True\n", "\n", " state = self.env.reset()\n", " done = False\n", " score = 0\n", "\n", " frames = []\n", " while not done:\n", " frames.append(self.env.render(mode=\"rgb_array\"))\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", "\n", " print(\"score: \", score)\n", " self.env.close()\n", "\n", " return frames\n", "\n", " def _target_soft_update(self):\n", " \"\"\"Soft-update: target = tau*local + (1-tau)*target.\"\"\"\n", " tau = self.tau\n", " for t_param, l_param in zip(\n", " self.actor_target.parameters(), self.actor.parameters()\n", " ):\n", " t_param.data.copy_(tau * l_param.data + (1.0 - tau) * t_param.data)\n", "\n", " for t_param, l_param in zip(\n", " self.critic_target1.parameters(), self.critic1.parameters()\n", " ):\n", " t_param.data.copy_(tau * l_param.data + (1.0 - tau) * t_param.data)\n", "\n", " for t_param, l_param in zip(\n", " self.critic_target2.parameters(), self.critic2.parameters()\n", " ):\n", " t_param.data.copy_(tau * l_param.data + (1.0 - tau) * t_param.data)\n", "\n", " def _plot(\n", " self,\n", " frame_idx: int,\n", " scores: List[float],\n", " actor_losses: List[float],\n", " critic_losses: List[float],\n", " ):\n", " \"\"\"Plot the training progresses.\"\"\"\n", " clear_output(True)\n", " plt.figure(figsize=(30, 5))\n", " plt.subplot(131)\n", " plt.title(\"frame %s. score: %s\" % (frame_idx, np.mean(scores[-10:])))\n", " plt.plot(scores)\n", " plt.subplot(132)\n", " plt.title(\"actor_loss\")\n", " plt.plot(actor_losses)\n", " plt.subplot(133)\n", " plt.title(\"critic_loss\")\n", " plt.plot(critic_losses)\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Environment\n", "*ActionNormalizer* is an action wrapper class to normalize the action values ranged in (-1. 1). Thanks to this class, we can make the agent simply select action values within the zero centered range (-1, 1)." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "class ActionNormalizer(gym.ActionWrapper):\n", " \"\"\"Rescale and relocate the actions.\"\"\"\n", "\n", " def action(self, action: np.ndarray) -> np.ndarray:\n", " \"\"\"Change the range (-1, 1) to (low, high).\"\"\"\n", " low = self.action_space.low\n", " high = self.action_space.high\n", "\n", " scale_factor = (high - low) / 2\n", " reloc_factor = high - scale_factor\n", "\n", " action = action * scale_factor + reloc_factor\n", " action = np.clip(action, low, high)\n", "\n", " return action\n", "\n", " def reverse_action(self, action: np.ndarray) -> np.ndarray:\n", " \"\"\"Change the range (low, high) to (-1, 1).\"\"\"\n", " low = self.action_space.low\n", " high = self.action_space.high\n", "\n", " scale_factor = (high - low) / 2\n", " reloc_factor = high - scale_factor\n", "\n", " action = (action - reloc_factor) / scale_factor\n", " action = np.clip(action, -1.0, 1.0)\n", "\n", " return action" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can see [the code](https://github.com/openai/gym/blob/master/gym/envs/classic_control/pendulum.py) and [configurations](https://github.com/openai/gym/blob/cedecb35e3428985fd4efad738befeb75b9077f1/gym/envs/__init__.py#L81) of Pendulum-v0 from OpenAI's repository." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/khkim/anaconda3/envs/pg-is-all-you-need/lib/python3.6/site-packages/gym/logger.py:30: UserWarning: \u001b[33mWARN: Box bound precision lowered by casting to float32\u001b[0m\n", " warnings.warn(colorize('%s: %s'%('WARN', msg % args), 'yellow'))\n" ] }, { "data": { "text/plain": [ "[777]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# environment\n", "env_id = \"Pendulum-v0\"\n", "env = gym.make(env_id)\n", "env = ActionNormalizer(env)\n", "env.seed(seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "cpu\n" ] } ], "source": [ "# parameters\n", "num_frames = 50000\n", "memory_size = 100000\n", "batch_size = 128\n", "initial_random_steps = 10000\n", "\n", "agent = TD3Agent(\n", " env, memory_size, batch_size, initial_random_steps=initial_random_steps\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "scrolled": true }, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "agent.train(num_frames)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test\n", "Run the trained agent (1 episode)." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "score: -235.42615105825175\n" ] } ], "source": [ "# test\n", "if IN_COLAB:\n", " agent.env = gym.wrappers.Monitor(agent.env, \"videos\", force=True)\n", "frames = agent.test()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Render" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", " \n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Once \n", " Loop \n", " Reflect \n", "
\n", "
\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "if IN_COLAB: # for colab\n", " import base64\n", " import glob\n", " import io\n", " import os\n", "\n", " from IPython.display import HTML, display\n", "\n", " def ipython_show_video(path: str) -> None:\n", " \"\"\"Show a video at `path` within IPython Notebook.\"\"\"\n", " if not os.path.isfile(path):\n", " raise NameError(\"Cannot access: {}\".format(path))\n", "\n", " video = io.open(path, \"r+b\").read()\n", " encoded = base64.b64encode(video)\n", "\n", " display(HTML(\n", " data=\"\"\"\n", " \n", " \"\"\".format(encoded.decode(\"ascii\"))\n", " ))\n", "\n", " list_of_files = glob.glob(\"videos/*.mp4\")\n", " latest_file = max(list_of_files, key=os.path.getctime)\n", " print(latest_file)\n", " ipython_show_video(latest_file)\n", "\n", "else: # for jupyter\n", " from matplotlib import animation\n", " from JSAnimation.IPython_display import display_animation\n", " from IPython.display import display\n", "\n", "\n", " def display_frames_as_gif(frames):\n", " \"\"\"Displays a list of frames as a gif, with controls.\"\"\"\n", " patch = plt.imshow(frames[0])\n", " plt.axis('off')\n", "\n", " def animate(i):\n", " patch.set_data(frames[i])\n", "\n", " anim = animation.FuncAnimation(\n", " plt.gcf(), animate, frames = len(frames), interval=50\n", " )\n", " display(display_animation(anim, default_mode='loop'))\n", "\n", "\n", " # display \n", " display_frames_as_gif(frames)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "pg-is-all-you-need", "language": "python", "name": "pg-is-all-you-need" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 4 }