{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Configuration for Colab" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import sys\n", "IN_COLAB = \"google.colab\" in sys.modules\n", "\n", "if IN_COLAB:\n", " !apt install python-opengl\n", " !apt install ffmpeg\n", " !apt install xvfb\n", " !pip install pyvirtualdisplay\n", " from pyvirtualdisplay import Display\n", " \n", " # Start virtual display\n", " dis = Display(visible=0, size=(600, 400))\n", " dis.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 06. DDPGfD\n", "\n", "[M. Vecerik et al., \"Leveraging Demonstrations for Deep Reinforcement Learning on Robotics Problems with Sparse Rewards.\"arXiv preprint arXiv:1707.08817, 2017](https://arxiv.org/pdf/1707.08817.pdf)\n", "\n", "ReinforcementLearning (RL) offers, in principle, a method to learn such policies from exploration, but the amount of actual exploration required has prohibited its use in real applications. In this paper the authors address this challenge by combining the demonstration and RL paradigms into a single framework which uses demonstrations to guide a deep-RL algorithm. \n", "\n", "The central contribution of this paper is to show that off-policy replay-memory-based RL (e.g. DDPG) is a natural vehicle for injecting demonstration data into sparse-reward tasks and that it obviates the need for reward-shaping. \n", "\n", "The algorithms called DDPG from Demonstration (*DDPGfD*) modifies DDPG to take advantage of demonstrations.\n", "\n", "### DDPG\n", "For learning in high-dimentional and continous action spaces, the authors of DDPG combine the actor-critic approach with insights from the success of DQN. Deep DPG(DDPG) is based on the deterministic policy gradient(DPG) algorithm ([Silver et al., 2014](http://proceedings.mlr.press/v32/silver14.pdf)). Please see *03.DDPG.ipynb* for detailed description of DDPG.\n", "\n", "### Prioritized Experience Replay (PER)\n", "Prioritized experience replay modifies the agent to sample more important transitions from its replay buffer more frequently.\n", "\n", "The probability of sampling a particular transition is proportional to its priority,\n", "$$\n", "P(i) = \\frac{p_i^{\\alpha}}{\\sum_k p_k^{\\alpha}}\n", "$$\n", ", where $p_i$ the priority of the transition. The priority is commonly to use the magnitude of a transition’s TD error.\n", "\n", "DDPGfD uses \n", "$$\n", "p_i = \\delta^{2}_{i} + \\lambda_3 |\\nabla_a Q(s_i, a_i|\\theta^Q)|^2 + \\epsilon + \\epsilon_D,\n", "$$\n", "- $\\delta_i$ is the last TD error calculated for this transition. The second term represents the loss applied to the actor.\n", "- $\\epsilon$ is a small positive constant to ensure all transitions are sampled with some probability.\n", "- $\\epsilon_D$ is a positive constant for demonstration transitions to increase their probability of getting sampled.\n", "- $\\lambda_3$ is used to weight the contributions.\n", "\n", "One more. Let's recall one of the main ideas of DQN. To remove correlation of observations, it uses uniformly random sampling from the replay buffer. Prioritized replay introduces bias because it doesn't sample experiences uniformly at random due to the sampling proportion correspoding to TD-error. We can correct this bias by using importance-sampling (IS) weights\n", "\n", "$$\n", "w_i = \\big( \\frac{1}{N} \\cdot \\frac{1}{P(i)} \\big)^\\beta\n", "$$\n", "\n", "that fully compensates for the non-uniform probabilities $P(i)$ if $\\beta = 1$. These weights can be folded into the Q-learning update by using $w_i\\delta_i$ instead of $\\delta_i$.\n", "\n", "For details, refer to the PER paper ([T. Schaul et al., 2015.](https://arxiv.org/pdf/1511.05952.pdf))\n", "\n", "### A mix of 1-step and n-step returns\n", "A modification for the sparse reward case is to use a mix of 1-step and n-step returns when updating the critic function. Incorporating *n-step returns* helps propagate the Q-values along the trajectories.\n", "\n", "The n-step return has the following form:\n", "$$\n", "R_n = \\sum^{n-1}_{i=0} \\gamma^i r_i + \\gamma^n Q(s'_{n-1}, \\pi(s'_{n-1}; \\theta^{Q'}))\n", "$$\n", "\n", "The loss corresponding to this particular rollout is then:\n", "$$\n", "L_n(\\theta^Q) = \\frac{1}{2} (R_n - Q(s, \\pi(s) | \\theta^Q))^2\n", "$$\n", "\n", "### Loss function\n", "The loss function is combined the above mentioned losses. Additionally *L2 regularization* on the parameters of the actor and the critic networks are added to stabilize the final learning performance. Two parameters called $\\lambda_1$, $\\lambda_2$ are used to weight the contributions.\n", "\n", "$$\n", "L_{Critic}(\\theta ^ Q) = L_1(\\theta^Q) + \\lambda_1 L_n(\\theta^Q) + \\lambda_2 L^{C}_{reg} (\\theta^Q) \\\\\n", "$$\n", "$$\n", "\\nabla_{\\theta^{\\pi}} L_{Actor}(\\theta^\\pi) = - \\nabla_{\\theta^{\\pi}} J(\\theta^\\pi) + \\lambda_2 L^{A}_{reg} (\\theta^\\pi)\n", "$$\n", "\n", "### Pretrain\n", "We make use of the demonstration data to pre-train the agent so that it can perform well in the task from the start of learning, and then continue improving from its own self-generated data.\n", "\n", "Reference: \n", "- [Pseudo code of DDPGfD paper](https://arxiv.org/pdf/1707.08817.pdf)\n", "- [DQfD](https://arxiv.org/pdf/1704.03732)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import modules" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import os\n", "import copy\n", "import random\n", "from collections import deque\n", "from typing import Deque, Dict, List, Tuple\n", "\n", "import gym\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "\n", "from IPython.display import clear_output\n", "\n", "if IN_COLAB and not os.path.exists(\"segment_tree.py\") and not os.path.exists(\"demo.pkl\"):\n", " # download segment tree module\n", " !wget https://raw.githubusercontent.com/mrsyee/pg-is-all-you-need/master/segment_tree.py\n", " # download demo.pkl\n", " !wget https://raw.githubusercontent.com/mrsyee/pg-is-all-you-need/master/demo.pkl\n", " \n", "from segment_tree import MinSegmentTree, SumSegmentTree" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set random seed" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "if torch.backends.cudnn.enabled:\n", " torch.backends.cudnn.benchmark = False\n", " torch.backends.cudnn.deterministic = True\n", "\n", "seed = 777\n", "torch.manual_seed(seed)\n", "np.random.seed(seed)\n", "random.seed(seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Replay buffer for N-step learning with demonstration\n", "\n", "There are a little bit changes in Replay buffer for N-step learning with demonstration.\n", "\n", "First, we use `deque` to store the most recent n-step transitions.\n", "\n", "```python\n", " self.n_step_buffer = deque(maxlen=n_step)\n", "```\n", "\n", "You can see it doesn't actually store a transition in the buffer, unless `n_step_buffer` is full.\n", "\n", "```\n", " # in store method\n", " if len(self.n_step_buffer) < self.n_step:\n", " return ()\n", "```\n", "\n", "When the length of `n_step_buffer` becomes equal to N, it eventually stores the N-step transition, which is calculated by `get_n_step_info` method (reference `util.py`). Furthermore, there are additional implementations for saving loaded demos. (Please see *03.DDPG.ipynb* for detailed description of the basic replay buffer.)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "class ReplayBuffer:\n", " \"\"\"A numpy replay buffer with demonstrations.\"\"\"\n", "\n", " def __init__(\n", " self, \n", " obs_dim: int, \n", " size: int, \n", " batch_size: int = 32, \n", " gamma: float = 0.99,\n", " demo: list = None,\n", " n_step: int = 1, \n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.acts_buf = np.zeros([size], dtype=np.float32)\n", " self.rews_buf = np.zeros([size], dtype=np.float32)\n", " self.done_buf = np.zeros([size], dtype=np.float32)\n", " self.max_size, self.batch_size = size, batch_size\n", " self.ptr, self.size = 0, 0\n", " \n", " # for N-step Learning\n", " self.n_step_buffer = deque(maxlen=n_step)\n", " self.n_step = n_step\n", " self.gamma = gamma\n", " \n", " # for demonstration\n", " self.demo_size = len(demo) if demo else 0\n", " self.demo = demo\n", " \n", " if self.demo:\n", " self.ptr += self.demo_size\n", " self.size += self.demo_size\n", " for ptr, d in enumerate(self.demo):\n", " state, action, reward, next_state, done = d\n", " self.obs_buf[ptr] = state\n", " self.acts_buf[ptr] = np.array(action)\n", " self.rews_buf[ptr] = reward\n", " self.next_obs_buf[ptr] = next_state\n", " self.done_buf[ptr] = done\n", "\n", " def store(\n", " self,\n", " obs: np.ndarray,\n", " act: np.ndarray, \n", " rew: float, \n", " next_obs: np.ndarray, \n", " done: bool,\n", " ) -> Tuple[np.ndarray, np.ndarray, float, np.ndarray, bool]:\n", " \"\"\"Store the transition in buffer.\"\"\"\n", " transition = (obs, act, rew, next_obs, done)\n", " self.n_step_buffer.append(transition)\n", " \n", " # single step transition is not ready\n", " if len(self.n_step_buffer) < self.n_step:\n", " return ()\n", " \n", " # make a n-step transition\n", " rew, next_obs, done = self._get_n_step_info()\n", " obs, act = self.n_step_buffer[0][:2]\n", " \n", " self.obs_buf[self.ptr] = obs\n", " self.next_obs_buf[self.ptr] = next_obs\n", " self.acts_buf[self.ptr] = act\n", " self.rews_buf[self.ptr] = rew\n", " self.done_buf[self.ptr] = done\n", " \n", " self.ptr += 1\n", " self.ptr = self.demo_size if self.ptr % self.max_size == 0 else self.ptr\n", " self.size = min(self.size + 1, self.max_size)\n", " \n", " return self.n_step_buffer[0]\n", "\n", " def sample_batch(self, indices: List[int] = None) -> Dict[str, np.ndarray]:\n", " \"\"\"Randomly sample a batch of experiences from memory.\"\"\"\n", " assert len(self) >= self.batch_size\n", " \n", " if indices is None:\n", " indices = np.random.choice(\n", " len(self), size=self.batch_size, replace=False\n", " )\n", " \n", " return dict(\n", " obs=self.obs_buf[indices],\n", " next_obs=self.next_obs_buf[indices],\n", " acts=self.acts_buf[indices],\n", " rews=self.rews_buf[indices],\n", " done=self.done_buf[indices],\n", " # for N-step learning\n", " indices=indices,\n", " )\n", " \n", " def _get_n_step_info(self) -> Tuple[np.int64, np.ndarray, bool]:\n", " \"\"\"Return n step rew, next_obs, and done.\"\"\"\n", " # info of the last transition\n", " rew, next_obs, done = self.n_step_buffer[-1][-3:]\n", "\n", " for transition in reversed(list(self.n_step_buffer)[:-1]):\n", " r, n_o, d = transition[-3:]\n", "\n", " rew = r + self.gamma * rew * (1 - d)\n", " next_obs, done = (n_o, d) if d else (next_obs, done)\n", "\n", " return rew, next_obs, done\n", "\n", " def __len__(self) -> int:\n", " return self.size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prioritized replay Buffer with demonstration\n", "\n", "The key concept of PER's implementation is *Segment Tree*. It efficiently stores and samples transitions while managing the priorities of them (reference `segment_tree.py`). We recommend you understand how it works before you move on. Here are references for you:\n", "\n", "- In Korean: https://mrsyee.github.io/rl/2019/01/25/PER-sumtree/\n", "- In English: https://www.geeksforgeeks.org/segment-tree-set-1-sum-of-given-range/\n", "\n", "In addtion, `epsilon_d` is a positive constant for demonstration transitions to increase their probability of getting sampled." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "class PrioritizedReplayBuffer(ReplayBuffer):\n", " \"\"\"Prioritized Replay buffer with demonstrations.\"\"\"\n", " \n", " def __init__(\n", " self, \n", " obs_dim: int,\n", " size: int, \n", " batch_size: int = 32, \n", " gamma: float = 0.99,\n", " alpha: float = 0.6,\n", " epsilon_d: float = 1.0,\n", " demo: list = None,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " assert alpha >= 0\n", " \n", " super(PrioritizedReplayBuffer, self).__init__(\n", " obs_dim, size, batch_size, gamma, demo, n_step=1 \n", " )\n", " self.max_priority, self.tree_ptr = 1.0, 0\n", " self.alpha = alpha\n", " self.epsilon_d = epsilon_d\n", " \n", " # capacity must be positive and a power of 2.\n", " tree_capacity = 1\n", " while tree_capacity < self.max_size:\n", " tree_capacity *= 2\n", "\n", " self.sum_tree = SumSegmentTree(tree_capacity)\n", " self.min_tree = MinSegmentTree(tree_capacity)\n", " \n", " # for init priority of demo\n", " self.tree_ptr = self.demo_size\n", " for i in range(self.demo_size):\n", " self.sum_tree[i] = self.max_priority ** self.alpha\n", " self.min_tree[i] = self.max_priority ** self.alpha\n", " \n", " def store(\n", " self, \n", " obs: np.ndarray, \n", " act: int, \n", " rew: float, \n", " next_obs: np.ndarray, \n", " done: bool\n", " ):\n", " \"\"\"Store experience and priority.\"\"\"\n", " transition = super().store(obs, act, rew, next_obs, done)\n", " \n", " if transition:\n", " self.sum_tree[self.tree_ptr] = self.max_priority ** self.alpha\n", " self.min_tree[self.tree_ptr] = self.max_priority ** self.alpha\n", "\n", " self.tree_ptr += 1\n", " if self.tree_ptr % self.max_size == 0:\n", " self.tree_ptr = self.demo_size\n", " \n", " return transition\n", "\n", " def sample_batch(self, beta: float = 0.4) -> Dict[str, np.ndarray]:\n", " \"\"\"Sample a batch of experiences.\"\"\"\n", " assert len(self) >= self.batch_size\n", " assert beta > 0\n", " \n", " indices = self._sample_proportional()\n", " \n", " obs = self.obs_buf[indices]\n", " next_obs = self.next_obs_buf[indices]\n", " acts = self.acts_buf[indices]\n", " rews = self.rews_buf[indices]\n", " done = self.done_buf[indices]\n", " weights = np.array([self._calculate_weight(i, beta) for i in indices])\n", " epsilon_d = np.array(\n", " [self.epsilon_d if i < self.demo_size else 0.0 for i in indices]\n", " )\n", " \n", " return dict(\n", " obs=obs,\n", " next_obs=next_obs,\n", " acts=acts,\n", " rews=rews,\n", " done=done,\n", " weights=weights,\n", " epsilon_d=epsilon_d,\n", " indices=indices,\n", " )\n", " \n", " def update_priorities(self, indices: List[int], priorities: np.ndarray):\n", " \"\"\"Update priorities of sampled transitions.\"\"\"\n", " assert len(indices) == len(priorities)\n", "\n", " for idx, priority in zip(indices, priorities):\n", " assert priority > 0\n", " assert 0 <= idx < len(self)\n", "\n", " self.sum_tree[idx] = priority ** self.alpha\n", " self.min_tree[idx] = priority ** self.alpha\n", "\n", " self.max_priority = max(self.max_priority, priority)\n", " \n", " def _sample_proportional(self) -> List[int]:\n", " \"\"\"Sample indices based on proportions.\"\"\"\n", " indices = []\n", " p_total = self.sum_tree.sum(0, len(self) - 1)\n", " segment = p_total / self.batch_size\n", " \n", " for i in range(self.batch_size):\n", " a = segment * i\n", " b = segment * (i + 1)\n", " upperbound = random.uniform(a, b)\n", " idx = self.sum_tree.retrieve(upperbound)\n", " indices.append(idx)\n", " \n", " return indices\n", " \n", " def _calculate_weight(self, idx: int, beta: float):\n", " \"\"\"Calculate the weight of the experience at idx.\"\"\"\n", " # get max weight\n", " p_min = self.min_tree.min() / self.sum_tree.sum()\n", " max_weight = (p_min * len(self)) ** (-beta)\n", " \n", " # calculate weights\n", " p_sample = self.sum_tree[idx] / self.sum_tree.sum()\n", " weight = (p_sample * len(self)) ** (-beta)\n", " weight = weight / max_weight\n", " \n", " return weight" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## OU Noise\n", "*Ornstein-Uhlenbeck* process generates temporally correlated exploration, and it effectively copes with physical control problems of inertia.\n", "\n", "$$\n", "dx_t = \\theta(\\mu - x_t) dt + \\sigma dW_t\n", "$$\n", "\n", "Reference: \n", "- [Udacity github](https://github.com/udacity/deep-reinforcement-learning/blob/master/ddpg-pendulum/ddpg_agent.py)\n", "- [Wiki](https://en.wikipedia.org/wiki/Ornstein%E2%80%93Uhlenbeck_process)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "class OUNoise:\n", " \"\"\"Ornstein-Uhlenbeck process.\n", " Taken from Udacity deep-reinforcement-learning github repository:\n", " https://github.com/udacity/deep-reinforcement-learning/blob/master/\n", " ddpg-pendulum/ddpg_agent.py\n", " \"\"\"\n", "\n", " def __init__(\n", " self, \n", " size: int, \n", " mu: float = 0.0, \n", " theta: float = 0.15, \n", " sigma: float = 0.2,\n", " ):\n", " \"\"\"Initialize parameters and noise process.\"\"\"\n", " self.state = np.float64(0.0)\n", " self.mu = mu * np.ones(size)\n", " self.theta = theta\n", " self.sigma = sigma\n", " self.reset()\n", "\n", " def reset(self):\n", " \"\"\"Reset the internal state (= noise) to mean (mu).\"\"\"\n", " self.state = copy.copy(self.mu)\n", "\n", " def sample(self) -> np.ndarray:\n", " \"\"\"Update internal state and return it as a noise sample.\"\"\"\n", " x = self.state\n", " dx = self.theta * (self.mu - x) + self.sigma * np.array(\n", " [random.random() for _ in range(len(x))]\n", " )\n", " self.state = x + dx\n", " return self.state" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Network\n", "We are going to use two separated networks for actor and critic. The actor network has three fully connected layers and three non-linearity functions, *ReLU* for hidden layers and *tanh* for the output layer. On the other hand, the critic network has three fully connected layers, but it used two activation functions for hidden layers *ReLU*. Plus, its input sizes of critic network are sum of state sizes and action sizes. One thing to note is that we initialize the final layer's weights and biases so that they are *uniformly distributed.*" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "class Actor(nn.Module):\n", " def __init__(\n", " self, \n", " in_dim: int, \n", " out_dim: int,\n", " init_w: float = 3e-3,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " super(Actor, self).__init__()\n", " \n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " self.out = nn.Linear(128, out_dim)\n", " \n", " self.out.weight.data.uniform_(-init_w, init_w)\n", " self.out.bias.data.uniform_(-init_w, init_w)\n", "\n", " def forward(self, state: torch.Tensor) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = F.relu(self.hidden1(state))\n", " x = F.relu(self.hidden2(x))\n", " action = self.out(x).tanh()\n", " \n", " return action\n", " \n", " \n", "class Critic(nn.Module):\n", " def __init__(\n", " self, \n", " in_dim: int, \n", " init_w: float = 3e-3,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " super(Critic, self).__init__()\n", " \n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " self.out = nn.Linear(128, 1)\n", " \n", " self.out.weight.data.uniform_(-init_w, init_w)\n", " self.out.bias.data.uniform_(-init_w, init_w)\n", "\n", " def forward(\n", " self, state: torch.Tensor, action: torch.Tensor\n", " ) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = torch.cat((state, action), dim=-1)\n", " x = F.relu(self.hidden1(x))\n", " x = F.relu(self.hidden2(x))\n", " value = self.out(x)\n", " \n", " return value" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DDPGfD Agent\n", "Here is a summary of DDPGfDAgent class.\n", "\n", "| Method | Note |\n", "|--- |--- |\n", "|select_action | select an action from the input state. |\n", "|step | take an action and return the response of the env. |\n", "|update_model | update the model by gradient descent. |\n", "|train | train the agent during num_frames. |\n", "|test | test the agent (1 episode). |\n", "|\\_pretrain |pretraining steps.|\n", "|\\_get_critic_loss | return element-wise critic loss. |\n", "|\\_target_soft_update| soft update from the local model to the target model.|\n", "|\\_get_n_step_info_from_demo | return 1 step and n step demos. |" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "class DDPGfDAgent:\n", " \"\"\"DDPGfDAgent interacting with environment.\n", " \n", " Attribute:\n", " env (gym.Env): openAI Gym environment\n", " actor (nn.Module): target actor model to select actions\n", " actor_target (nn.Module): actor model to predict next actions\n", " actor_optimizer (Optimizer): optimizer for training actor\n", " critic (nn.Module): critic model to predict state values\n", " critic_target (nn.Module): target critic model to predict state values\n", " critic_optimizer (Optimizer): optimizer for training critic\n", " demo (list): demonstration\n", " memory (ReplayBuffer): replay memory to store transitions\n", " batch_size (int): batch size for sampling\n", " gamma (float): discount factor\n", " tau (float): parameter for soft target update\n", " initial_random_steps (int): initial random action steps\n", " pretrain_step (int): the number of step for pre-training\n", " n_step (int): the number of multi step\n", " use_n_step (bool): whether to use n_step memory\n", " prior_eps (float): guarantees every transitions can be sampled\n", " lambda1 (float): n-step return weight\n", " lambda2 (float): l2 regularization weight\n", " lambda3 (float): actor loss contribution of prior weight\n", " noise (OUNoise): noise generator for exploration\n", " device (torch.device): cpu / gpu\n", " transition (list): temporory storage for the recent transition\n", " total_step (int): total step numbers\n", " is_test (bool): flag to show the current mode (train / test)\n", " \"\"\"\n", " def __init__(\n", " self,\n", " env: gym.Env,\n", " memory_size: int,\n", " batch_size: int,\n", " ou_noise_theta: float,\n", " ou_noise_sigma: float,\n", " demo: list,\n", " pretrain_step: int,\n", " gamma: float = 0.99,\n", " tau: float = 5e-3,\n", " initial_random_steps: int = 1e4,\n", " # PER parameters\n", " alpha: float = 0.3,\n", " beta: float = 1.0,\n", " prior_eps: float = 1e-6,\n", " # N-step Learning\n", " n_step: int = 3,\n", " # loss parameters\n", " lambda1: float = 1.0, # N-step return weight\n", " lambda2: float = 1e-4, # l2 regularization weight\n", " lambda3: float = 1.0, # actor loss contribution of prior weight\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " obs_dim = env.observation_space.shape[0]\n", " action_dim = env.action_space.shape[0]\n", "\n", " self.env = env\n", " self.batch_size = batch_size\n", " self.pretrain_step = pretrain_step\n", " self.gamma = gamma\n", " self.tau = tau\n", " self.initial_random_steps = initial_random_steps\n", " self.lambda1 = lambda1\n", " self.lambda3 = lambda3\n", " \n", " self.demo = demo\n", " demos_1_step, demos_n_step = [], []\n", " if self.demo:\n", " demos_1_step, demos_n_step = self._get_n_step_info_from_demo(\n", " demo, n_step\n", " )\n", " \n", " # PER\n", " # memory for 1-step Learning\n", " self.beta = beta\n", " self.prior_eps = prior_eps\n", " self.memory = PrioritizedReplayBuffer(\n", " obs_dim, memory_size, batch_size, gamma, alpha, demo=demos_1_step\n", " )\n", " \n", " # memory for N-step Learning\n", " self.use_n_step = True if n_step > 1 else False\n", " if self.use_n_step:\n", " self.n_step = n_step\n", " self.memory_n = ReplayBuffer(\n", " obs_dim, \n", " memory_size, \n", " batch_size, \n", " gamma, \n", " demos_n_step, \n", " self.n_step\n", " )\n", " \n", " # noise\n", " self.noise = OUNoise(\n", " action_dim,\n", " theta=ou_noise_theta,\n", " sigma=ou_noise_sigma,\n", " )\n", "\n", " # device: cpu / gpu\n", " self.device = torch.device(\n", " \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", " )\n", " print(self.device)\n", "\n", " # networks\n", " self.actor = Actor(obs_dim, action_dim).to(self.device)\n", " self.actor_target = Actor(obs_dim, action_dim).to(self.device)\n", " self.actor_target.load_state_dict(self.actor.state_dict())\n", " \n", " self.critic = Critic(obs_dim + action_dim).to(self.device)\n", " self.critic_target = Critic(obs_dim + action_dim).to(self.device)\n", " self.critic_target.load_state_dict(self.critic.state_dict())\n", "\n", " # optimizer\n", " self.actor_optimizer = optim.Adam(\n", " self.actor.parameters(),\n", " lr=3e-4,\n", " weight_decay=lambda2,\n", " )\n", " self.critic_optimizer = optim.Adam(\n", " self.critic.parameters(),\n", " lr=1e-3,\n", " weight_decay=lambda2,\n", " )\n", " \n", " # transition to store in memory\n", " self.transition = list()\n", " \n", " # total steps count\n", " self.total_step = 0\n", "\n", " # mode: train / test\n", " self.is_test = False\n", " \n", " def select_action(self, state: np.ndarray) -> np.ndarray:\n", " \"\"\"Select an action from the input state.\"\"\"\n", " # if initial random action should be conducted\n", " if self.total_step < self.initial_random_steps and not self.is_test:\n", " selected_action = self.env.action_space.sample()\n", " else:\n", " selected_action = self.actor(\n", " torch.FloatTensor(state).to(self.device)\n", " ).detach().cpu().numpy()\n", " \n", " # add noise for exploration during training\n", " if not self.is_test:\n", " noise = self.noise.sample()\n", " selected_action = np.clip(selected_action + noise, -1.0, 1.0)\n", " \n", " self.transition = [state, selected_action]\n", " \n", " return selected_action\n", " \n", " def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:\n", " \"\"\"Take an action and return the response of the env.\"\"\"\n", " next_state, reward, done, _ = self.env.step(action)\n", " \n", " if not self.is_test:\n", " self.transition += [reward, next_state, done]\n", " \n", " # N-step transition\n", " transition = self.transition\n", " if self.use_n_step:\n", " transition = self.memory_n.store(*self.transition)\n", "\n", " # add a single step transition\n", " if transition:\n", " self.memory.store(*transition)\n", " \n", " return next_state, reward, done\n", " \n", " def update_model(self) -> Tuple[torch.Tensor, ...]:\n", " \"\"\"Update the model by gradient descent.\"\"\"\n", " device = self.device # for shortening the following lines\n", " \n", " samples = self.memory.sample_batch(self.beta) \n", " state = torch.FloatTensor(samples[\"obs\"]).to(device)\n", " action = torch.FloatTensor(samples[\"acts\"].reshape(-1, 1)).to(device)\n", "\n", " weights = torch.FloatTensor(\n", " samples[\"weights\"].reshape(-1, 1)\n", " ).to(device)\n", " epsilon_d = samples[\"epsilon_d\"]\n", " indices = samples[\"indices\"]\n", " \n", " # train critic\n", " # 1-step loss\n", " critic_loss_element_wise = self._get_critic_loss(samples, self.gamma)\n", " critic_loss = torch.mean(critic_loss_element_wise * weights)\n", " \n", " # n-step loss\n", " if self.use_n_step:\n", " samples_n = self.memory_n.sample_batch(indices)\n", " n_gamma = self.gamma ** self.n_step\n", " critic_loss_n_element_wise = self._get_critic_loss(\n", " samples_n, n_gamma\n", " )\n", " \n", " # to update loss and priorities\n", " critic_loss_element_wise += (\n", " critic_loss_n_element_wise * self.lambda1\n", " )\n", " critic_loss = torch.mean(critic_loss_element_wise * weights) \n", " \n", " self.critic_optimizer.zero_grad()\n", " critic_loss.backward()\n", " self.critic_optimizer.step()\n", " \n", " # train actor\n", " actor_loss_element_wise = -self.critic(state, self.actor(state))\n", " actor_loss = torch.mean(actor_loss_element_wise * weights)\n", " \n", " self.actor_optimizer.zero_grad()\n", " actor_loss.backward()\n", " self.actor_optimizer.step()\n", " \n", " # target update\n", " self._target_soft_update()\n", " \n", " # PER: update priorities\n", " new_priorities = critic_loss_element_wise\n", " new_priorities += self.lambda3 * actor_loss_element_wise.pow(2)\n", " new_priorities += self.prior_eps\n", " new_priorities = new_priorities.data.cpu().numpy().squeeze()\n", " new_priorities += epsilon_d\n", " self.memory.update_priorities(indices, new_priorities)\n", " \n", " # check the number of sampling demos\n", " demo_idxs = np.where(epsilon_d != 0.0)\n", " n_demo = demo_idxs[0].size\n", " \n", " return actor_loss.data, critic_loss.data, n_demo\n", " \n", " def _pretrain(self) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:\n", " \"\"\"Pretraining steps.\"\"\"\n", " actor_losses = []\n", " critic_losses = []\n", " print(\"Pre-Train %d step.\" % self.pretrain_step)\n", " for _ in range(1, self.pretrain_step + 1):\n", " actor_loss, critic_loss, _ = self.update_model()\n", " actor_losses.append(actor_loss.data)\n", " critic_losses.append(critic_loss.data)\n", " print(\"Pre-Train Complete!\\n\")\n", " return actor_losses, critic_losses\n", " \n", " def train(self, num_frames: int, plotting_interval: int = 200):\n", " \"\"\"Train the agent.\"\"\"\n", " self.is_test = False\n", " \n", " state = self.env.reset()\n", " actor_losses, critic_losses, n_demo_list, scores = [], [], [], []\n", " score = 0\n", " \n", " if self.demo:\n", " output = self._pretrain()\n", " actor_losses.extend(output[0])\n", " critic_losses.extend(output[1])\n", " \n", " for self.total_step in range(1, num_frames + 1):\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", " \n", " # PER: increase beta\n", " fraction = min(self.total_step / num_frames, 1.0)\n", " self.beta = self.beta + fraction * (1.0 - self.beta)\n", "\n", " # if episode ends\n", " if done: \n", " state = env.reset()\n", " scores.append(score)\n", " score = 0\n", "\n", " # if training is ready\n", " if (\n", " len(self.memory) >= self.batch_size \n", " and self.total_step > self.initial_random_steps\n", " ):\n", " actor_loss, critic_loss, n_demo = self.update_model()\n", " actor_losses.append(actor_loss)\n", " critic_losses.append(critic_loss)\n", " n_demo_list.append(n_demo)\n", " \n", " # plotting\n", " if self.total_step % plotting_interval == 0:\n", " self._plot(\n", " self.total_step, \n", " scores, \n", " actor_losses, \n", " critic_losses,\n", " n_demo_list,\n", " )\n", " \n", " self.env.close()\n", " \n", " def test(self):\n", " \"\"\"Test the agent.\"\"\"\n", " self.is_test = True\n", " \n", " state = self.env.reset()\n", " done = False\n", " score = 0\n", " \n", " frames = []\n", " while not done:\n", " frames.append(self.env.render(mode=\"rgb_array\"))\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", " \n", " print(\"score: \", score)\n", " self.env.close()\n", " \n", " return frames\n", " \n", " def _get_critic_loss(\n", " self, samples: Dict[str, np.ndarray], gamma: float\n", " ) -> torch.Tensor:\n", " \"\"\"Return element-wise critic loss.\"\"\"\n", " device = self.device # for shortening the following lines\n", " \n", " state = torch.FloatTensor(samples[\"obs\"]).to(device)\n", " next_state = torch.FloatTensor(samples[\"next_obs\"]).to(device)\n", " action = torch.FloatTensor(samples[\"acts\"].reshape(-1, 1)).to(device)\n", " reward = torch.FloatTensor(samples[\"rews\"].reshape(-1, 1)).to(device)\n", " done = torch.FloatTensor(samples[\"done\"].reshape(-1, 1)).to(device)\n", " \n", " masks = 1 - done\n", " next_action = self.actor_target(next_state)\n", " next_value = self.critic_target(next_state, next_action)\n", " curr_return = reward + gamma * next_value * masks\n", " curr_return = curr_return.to(device).detach()\n", "\n", " # train critic\n", " values = self.critic(state, action)\n", " critic_loss_element_wise = (values - curr_return).pow(2)\n", "\n", " return critic_loss_element_wise\n", " \n", " def _target_soft_update(self):\n", " \"\"\"Soft-update: target = tau*local + (1-tau)*target.\"\"\"\n", " tau = self.tau\n", " \n", " for t_param, l_param in zip(\n", " self.actor_target.parameters(), self.actor.parameters()\n", " ):\n", " t_param.data.copy_(tau * l_param.data + (1.0 - tau) * t_param.data)\n", " \n", " for t_param, l_param in zip(\n", " self.critic_target.parameters(), self.critic.parameters()\n", " ):\n", " t_param.data.copy_(tau * l_param.data + (1.0 - tau) * t_param.data)\n", " \n", " def _get_n_step_info_from_demo(\n", " self, demo: List, n_step: int\n", " ) -> Tuple[List, List]:\n", " \"\"\"Return 1 step and n step demos.\"\"\"\n", " demos_1_step = list()\n", " demos_n_step = list()\n", " n_step_buffer: Deque = deque(maxlen=n_step)\n", "\n", " for transition in demo:\n", " n_step_buffer.append(transition)\n", "\n", " if len(n_step_buffer) == n_step:\n", " # add a single step transition\n", " demos_1_step.append(n_step_buffer[0])\n", "\n", " # add a multi step transition\n", " curr_state, action = n_step_buffer[0][:2]\n", " \n", " # get n-step info\n", " reward, next_state, done = n_step_buffer[-1][-3:]\n", " for transition in reversed(list(n_step_buffer)[:-1]):\n", " r, n_o, d = transition[-3:]\n", "\n", " reward = r + self.gamma * reward * (1 - d)\n", " next_state, done = (n_o, d) if d else (next_state, done)\n", " \n", " transition = (curr_state, action, reward, next_state, done)\n", " demos_n_step.append(transition)\n", "\n", " return demos_1_step, demos_n_step\n", " \n", " def _plot(\n", " self, \n", " frame_idx: int, \n", " scores: List[float], \n", " actor_losses: List[float], \n", " critic_losses: List[float], \n", " n_demo: List[int],\n", " ):\n", " \"\"\"Plot the training progresses.\"\"\"\n", " def subplot(loc: int, title: str, values: List[float]):\n", " plt.subplot(loc)\n", " plt.title(title)\n", " plt.plot(values)\n", " \n", " subplot_params = [\n", " (141, f\"frame {frame_idx}. score: {np.mean(scores[-10:])}\", scores),\n", " (142, \"actor_loss\", actor_losses),\n", " (143, \"critic_loss\", critic_losses),\n", " (144, \"the number of sampling demos\", n_demo),\n", " ]\n", " \n", " clear_output(True)\n", " plt.figure(figsize=(30, 5)) \n", " for loc, title, values in subplot_params:\n", " subplot(loc, title, values)\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Environment\n", "*ActionNormalizer* is an action wrapper class to normalize the action values ranged in (-1. 1). Thanks to this class, we can make the agent simply select action values within the zero centered range (-1, 1)." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "class ActionNormalizer(gym.ActionWrapper):\n", " \"\"\"Rescale and relocate the actions.\"\"\"\n", "\n", " def action(self, action: np.ndarray) -> np.ndarray:\n", " \"\"\"Change the range (-1, 1) to (low, high).\"\"\"\n", " low = self.action_space.low\n", " high = self.action_space.high\n", "\n", " scale_factor = (high - low) / 2\n", " reloc_factor = high - scale_factor\n", "\n", " action = action * scale_factor + reloc_factor\n", " action = np.clip(action, low, high)\n", "\n", " return action\n", "\n", " def reverse_action(self, action: np.ndarray) -> np.ndarray:\n", " \"\"\"Change the range (low, high) to (-1, 1).\"\"\"\n", " low = self.action_space.low\n", " high = self.action_space.high\n", "\n", " scale_factor = (high - low) / 2\n", " reloc_factor = high - scale_factor\n", "\n", " action = (action - reloc_factor) / scale_factor\n", " action = np.clip(action, -1.0, 1.0)\n", "\n", " return action" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can see [the code](https://github.com/openai/gym/blob/master/gym/envs/classic_control/pendulum.py) and [configurations](https://github.com/openai/gym/blob/cedecb35e3428985fd4efad738befeb75b9077f1/gym/envs/__init__.py#L81) of Pendulum-v0 from OpenAI's repository." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/khkim/anaconda3/envs/pg-is-all-you-need/lib/python3.6/site-packages/gym/logger.py:30: UserWarning: \u001b[33mWARN: Box bound precision lowered by casting to float32\u001b[0m\n", " warnings.warn(colorize('%s: %s'%('WARN', msg % args), 'yellow'))\n" ] } ], "source": [ "# environment\n", "env_id = \"Pendulum-v0\"\n", "env = gym.make(env_id)\n", "env = ActionNormalizer(env)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize\n", "We make the demonstration using the well-trained agent in advance. (The given demo.pkl is created transitions using *03.DDPG* agent.)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "import pickle\n", "\n", "# load demo on replay memory\n", "demo_path = \"demo.pkl\"\n", "with open(demo_path, \"rb\") as f:\n", " demo = pickle.load(f)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "cpu\n" ] } ], "source": [ "# parameters\n", "num_frames = 50000\n", "memory_size = 100000\n", "batch_size = 128\n", "ou_noise_theta = 1.0\n", "ou_noise_sigma = 0.1\n", "initial_random_steps = 10000\n", "n_step = 3\n", "pretrain_step = 1000\n", "\n", "agent = DDPGfDAgent(\n", " env, \n", " memory_size, \n", " batch_size,\n", " ou_noise_theta,\n", " ou_noise_sigma,\n", " demo,\n", " n_step=n_step,\n", " pretrain_step=pretrain_step,\n", " initial_random_steps=initial_random_steps,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "agent.train(num_frames)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test\n", "Run the trained agent (1 episode)." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "score: -232.4411445000226\n" ] } ], "source": [ "# test\n", "if IN_COLAB:\n", " agent.env = gym.wrappers.Monitor(agent.env, \"videos\", force=True)\n", "frames = agent.test()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Render" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", " \n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Once \n", " Loop \n", " Reflect \n", "
\n", "
\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "if IN_COLAB: # for colab\n", " import base64\n", " import glob\n", " import io\n", " import os\n", "\n", " from IPython.display import HTML, display\n", "\n", " def ipython_show_video(path: str) -> None:\n", " \"\"\"Show a video at `path` within IPython Notebook.\"\"\"\n", " if not os.path.isfile(path):\n", " raise NameError(\"Cannot access: {}\".format(path))\n", "\n", " video = io.open(path, \"r+b\").read()\n", " encoded = base64.b64encode(video)\n", "\n", " display(HTML(\n", " data=\"\"\"\n", " \n", " \"\"\".format(encoded.decode(\"ascii\"))\n", " ))\n", "\n", " list_of_files = glob.glob(\"videos/*.mp4\")\n", " latest_file = max(list_of_files, key=os.path.getctime)\n", " print(latest_file)\n", " ipython_show_video(latest_file)\n", "\n", "else: # for jupyter\n", " from matplotlib import animation\n", " from JSAnimation.IPython_display import display_animation\n", " from IPython.display import display\n", "\n", "\n", " def display_frames_as_gif(frames):\n", " \"\"\"Displays a list of frames as a gif, with controls.\"\"\"\n", " patch = plt.imshow(frames[0])\n", " plt.axis('off')\n", "\n", " def animate(i):\n", " patch.set_data(frames[i])\n", "\n", " anim = animation.FuncAnimation(\n", " plt.gcf(), animate, frames = len(frames), interval=50\n", " )\n", " display(display_animation(anim, default_mode='loop'))\n", "\n", "\n", " # display \n", " display_frames_as_gif(frames)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "pg-is-all-you-need", "language": "python", "name": "pg-is-all-you-need" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 4 }