{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Configuration for Colab" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import sys\n", "IN_COLAB = \"google.colab\" in sys.modules\n", "\n", "if IN_COLAB:\n", " !apt install python-opengl\n", " !apt install ffmpeg\n", " !apt install xvfb\n", " !pip install pyvirtualdisplay\n", " from pyvirtualdisplay import Display\n", " \n", " # Start virtual display\n", " dis = Display(visible=0, size=(600, 400))\n", " dis.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 03. DDPG\n", "\n", "[T. P. Lillicrap et al., \"Continuous control with deep reinforcement learning.\" arXiv preprint arXiv:1509.02971, 2015.](https://arxiv.org/pdf/1509.02971.pdf)\n", "\n", "Deep Q Network(DQN)([Mnih et al., 2013;2015](https://storage.googleapis.com/deepmind-media/dqn/DQNNaturePaper.pdf)) algorithm is combined advances in deep learning with reinforcement learning. However, while DQN solves problems with high-dimentional observation spaces, it can only handle discrete and low-dimentional action spaces because of using greedy policy. For learning in high-dimentional and continous action spaces, the authors combine the actor-critic approach with insights from the recent success of DQN. Deep DPG(DDPG) is based on the deterministic policy gradient(DPG) algorithm ([Silver et al., 2014](http://proceedings.mlr.press/v32/silver14.pdf)). \n", "\n", "### Deterministic policy gradient\n", "The DPG algorithm maintains a parameterized actor function $\\mu(s|\\theta^{\\mu})$ which specifies the current policy by deterministically mapping states to a specific action. The critic $Q(s, a)$ is learned using the Bellman equation as in Q-learning. The actor is updated by following the applying the chain rule to the expected return from the start distribution $J$ with respect to the actor parameters\n", "\n", "$$\n", "\\begin{align*}\n", "\\nabla_{\\theta^{\\mu}}J &\\approx E_{s_t\\sim\\rho^\\beta} [\\nabla_{\\theta^{\\mu}} Q(s,a|\\theta^Q)|_{s=s_t, a=\\mu(s_t|\\theta^\\mu)}] \\\\\n", "&= E_{s_t\\sim\\rho^\\beta} [\\nabla_{a} Q(s,a|\\theta^Q)|_{s=s_t, a=\\mu(s_t)} \\nabla_{\\theta^{\\mu}} \\mu(s|\\theta^\\mu)|_{s=s_t}]\n", "\\end{align*}\n", "$$\n", "\n", "### Replay buffer\n", "One challenge when using neural networks for reinforcement learning is that most optimization algorithms assume that **the samples are independently and identically distributed**. When the samples are generated from exploring sequentially in an environment this assumption no longer holds. The authors used a **replay buffer** to address these issues. Transitions were sampled from the environment according to the exploration policy and the tuple $(s_t, a_t, r_t, s_{t+1})$ was stored in the replay buffer. At each timestep the actor and critic are updated by sampling a minibatch uniformly from the buffer. It allows to benefit from learning across a set of **uncorrelated** transitions.\n", "\n", "### Soft update target network\n", "Since the network $(Q(s,a|\\theta^Q)$ being updated is also used in calculating the target value, the Q update is prone to divergence. To avoid this, the authors use **the target network** like DQN, but modified for actor-critic and using **soft target updates**. Target netwokrs is created by copying the actor and critic networks, $Q'(s,a|\\theta^{Q'})$ and $\\mu'(s|\\theta^{\\mu`})$ respectively, that are used for calculating the target values. The weights of these target networks are then updated by having them slowly track the learned networks:\n", "\n", "$$\n", "\\theta' \\leftarrow \\tau \\theta + (1 - \\tau)\\theta' \\ \\ \\ {with} \\ \\tau \\ll 1.\n", "$$\n", "\n", "It greatly improves the stability of learning.\n", "\n", "### Exploration for continuous action space\n", "An advantage of offpolicies algorithms such as DDPG is that we can treat the problem of exploration independently from the learning algorithm. The authors construct an exploration policy $\\mu'$ by adding noise sampled from a noise process $\\mathcal{N}$ to the actor policy\n", "\n", "$$\n", "\\mu'(s_t) = \\mu(s_t|\\theta^{\\mu}_t) + \\mathcal{N}\n", "$$\n", "\n", "$\\mathcal{N}$ can be chosen to suit the environment. The authors used **Ornstein-Uhlenbeck process** to generate temporally correlated exploration." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import modules" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import copy\n", "import random\n", "from typing import Dict, List, Tuple\n", "\n", "import gym\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "\n", "from IPython.display import clear_output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set random seed" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "if torch.backends.cudnn.enabled:\n", " torch.backends.cudnn.benchmark = False\n", " torch.backends.cudnn.deterministic = True\n", "\n", "seed = 777\n", "torch.manual_seed(seed)\n", "np.random.seed(seed)\n", "random.seed(seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Replay buffer\n", "Typically, people implement replay buffers with one of the following three data structures:\n", "\n", "- collections.deque\n", "- list\n", "- numpy.ndarray\n", "\n", "**deque** is very easy to handle once you initialize its maximum length (e.g. deque(maxlen=buffer_size)). However, the indexing operation of deque gets terribly slow as it grows up because it is [internally doubly linked list](https://wiki.python.org/moin/TimeComplexity#collections.deque). On the other hands, **list** is an array, so it is relatively faster than deque when you sample batches at every step. Its amortized cost of Get item is [O(1)](https://wiki.python.org/moin/TimeComplexity#list).\n", "\n", "Last but not least, let's see **numpy.ndarray**. numpy.ndarray is even faster than list due to the fact that it is [a homogeneous array of fixed-size items](https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.html#numpy.ndarray), so you can get the benefits of [locality of reference](https://en.wikipedia.org/wiki/Locality_of_reference), . Whereas list is an array of pointers to objects, even when all of them are of the same type.\n", "\n", "Here, we are going to implement a replay buffer using numpy.ndarray.\n", "\n", "Reference: \n", "- [OpenAI spinning-up](https://github.com/openai/spinningup/blob/master/spinup/algos/sac/sac.py#L10)\n", "- [rainbow-is-all-you-need](https://render.githubusercontent.com/view/ipynb?commit=032d11277cf2436853478a69ca5a4aba03202598&enc_url=68747470733a2f2f7261772e67697468756275736572636f6e74656e742e636f6d2f437572742d5061726b2f7261696e626f772d69732d616c6c2d796f752d6e6565642f303332643131323737636632343336383533343738613639636135613461626130333230323539382f30312e64716e2e6970796e62&nwo=Curt-Park%2Frainbow-is-all-you-need&path=01.dqn.ipynb&repository_id=191133946&repository_type=Repository#Replay-buffer)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "class ReplayBuffer:\n", " \"\"\"A simple numpy replay buffer.\"\"\"\n", "\n", " def __init__(self, obs_dim: int, size: int, batch_size: int = 32):\n", " \"\"\"Initializate.\"\"\"\n", " self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)\n", " self.acts_buf = np.zeros([size], dtype=np.float32)\n", " self.rews_buf = np.zeros([size], dtype=np.float32)\n", " self.done_buf = np.zeros([size], dtype=np.float32)\n", " self.max_size, self.batch_size = size, batch_size\n", " self.ptr, self.size, = 0, 0\n", "\n", " def store(\n", " self,\n", " obs: np.ndarray,\n", " act: np.ndarray, \n", " rew: float, \n", " next_obs: np.ndarray, \n", " done: bool,\n", " ):\n", " \"\"\"Store the transition in buffer.\"\"\"\n", " self.obs_buf[self.ptr] = obs\n", " self.next_obs_buf[self.ptr] = next_obs\n", " self.acts_buf[self.ptr] = act\n", " self.rews_buf[self.ptr] = rew\n", " self.done_buf[self.ptr] = done\n", " self.ptr = (self.ptr + 1) % self.max_size\n", " self.size = min(self.size + 1, self.max_size)\n", "\n", " def sample_batch(self) -> Dict[str, np.ndarray]:\n", " \"\"\"Randomly sample a batch of experiences from memory.\"\"\"\n", " idxs = np.random.choice(self.size, size=self.batch_size, replace=False)\n", " return dict(obs=self.obs_buf[idxs],\n", " next_obs=self.next_obs_buf[idxs],\n", " acts=self.acts_buf[idxs],\n", " rews=self.rews_buf[idxs],\n", " done=self.done_buf[idxs])\n", "\n", " def __len__(self) -> int:\n", " return self.size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## OU Noise\n", "**Ornstein-Uhlenbeck** process generates temporally correlated exploration, and it effectively copes with physical control problems of inertia.\n", "\n", "$$\n", "dx_t = \\theta(\\mu - x_t) dt + \\sigma dW_t\n", "$$\n", "\n", "Reference: \n", "- [Udacity github](https://github.com/udacity/deep-reinforcement-learning/blob/master/ddpg-pendulum/ddpg_agent.py)\n", "- [Wiki](https://en.wikipedia.org/wiki/Ornstein%E2%80%93Uhlenbeck_process)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "class OUNoise:\n", " \"\"\"Ornstein-Uhlenbeck process.\n", " Taken from Udacity deep-reinforcement-learning github repository:\n", " https://github.com/udacity/deep-reinforcement-learning/blob/master/\n", " ddpg-pendulum/ddpg_agent.py\n", " \"\"\"\n", "\n", " def __init__(\n", " self, \n", " size: int, \n", " mu: float = 0.0, \n", " theta: float = 0.15, \n", " sigma: float = 0.2,\n", " ):\n", " \"\"\"Initialize parameters and noise process.\"\"\"\n", " self.state = np.float64(0.0)\n", " self.mu = mu * np.ones(size)\n", " self.theta = theta\n", " self.sigma = sigma\n", " self.reset()\n", "\n", " def reset(self):\n", " \"\"\"Reset the internal state (= noise) to mean (mu).\"\"\"\n", " self.state = copy.copy(self.mu)\n", "\n", " def sample(self) -> np.ndarray:\n", " \"\"\"Update internal state and return it as a noise sample.\"\"\"\n", " x = self.state\n", " dx = self.theta * (self.mu - x) + self.sigma * np.array(\n", " [random.random() for _ in range(len(x))]\n", " )\n", " self.state = x + dx\n", " return self.state" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Network\n", "We are going to use two separated networks for actor and critic. The actor network has three fully connected layers and three non-linearity functions, **ReLU** for hidden layers and **tanh** for the output layer. On the other hand, the critic network has three fully connected layers, but it used two activation functions for hidden layers **ReLU**. Plus, its input sizes of critic network are sum of state sizes and action sizes. One thing to note is that we initialize the final layer's weights and biases so that they are **uniformly distributed.**" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "class Actor(nn.Module):\n", " def __init__(\n", " self, \n", " in_dim: int, \n", " out_dim: int,\n", " init_w: float = 3e-3,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " super(Actor, self).__init__()\n", " \n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " self.out = nn.Linear(128, out_dim)\n", " \n", " self.out.weight.data.uniform_(-init_w, init_w)\n", " self.out.bias.data.uniform_(-init_w, init_w)\n", "\n", " def forward(self, state: torch.Tensor) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = F.relu(self.hidden1(state))\n", " x = F.relu(self.hidden2(x))\n", " action = self.out(x).tanh()\n", " \n", " return action\n", " \n", " \n", "class Critic(nn.Module):\n", " def __init__(\n", " self, \n", " in_dim: int, \n", " init_w: float = 3e-3,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " super(Critic, self).__init__()\n", " \n", " self.hidden1 = nn.Linear(in_dim, 128)\n", " self.hidden2 = nn.Linear(128, 128)\n", " self.out = nn.Linear(128, 1)\n", " \n", " self.out.weight.data.uniform_(-init_w, init_w)\n", " self.out.bias.data.uniform_(-init_w, init_w)\n", "\n", " def forward(\n", " self, state: torch.Tensor, action: torch.Tensor\n", " ) -> torch.Tensor:\n", " \"\"\"Forward method implementation.\"\"\"\n", " x = torch.cat((state, action), dim=-1)\n", " x = F.relu(self.hidden1(x))\n", " x = F.relu(self.hidden2(x))\n", " value = self.out(x)\n", " \n", " return value" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DDPG Agent\n", "Here is a summary of DDPGAgent class.\n", "\n", "| Method | Note |\n", "|--- |--- |\n", "|select_action | select an action from the input state. |\n", "|step | take an action and return the response of the env. |\n", "|update_model | update the model by gradient descent. |\n", "|train | train the agent during num_frames. |\n", "|test | test the agent (1 episode). |\n", "|\\_target_soft_update| soft update from the local model to the target model.|\n", "|\\_plot | plot the training progresses. |" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "class DDPGAgent:\n", " \"\"\"DDPGAgent interacting with environment.\n", " \n", " Attribute:\n", " env (gym.Env): openAI Gym environment\n", " actor (nn.Module): target actor model to select actions\n", " actor_target (nn.Module): actor model to predict next actions\n", " actor_optimizer (Optimizer): optimizer for training actor\n", " critic (nn.Module): critic model to predict state values\n", " critic_target (nn.Module): target critic model to predict state values\n", " critic_optimizer (Optimizer): optimizer for training critic\n", " memory (ReplayBuffer): replay memory to store transitions\n", " batch_size (int): batch size for sampling\n", " gamma (float): discount factor\n", " tau (float): parameter for soft target update\n", " initial_random_steps (int): initial random action steps\n", " noise (OUNoise): noise generator for exploration\n", " device (torch.device): cpu / gpu\n", " transition (list): temporory storage for the recent transition\n", " total_step (int): total step numbers\n", " is_test (bool): flag to show the current mode (train / test)\n", " \"\"\"\n", " def __init__(\n", " self,\n", " env: gym.Env,\n", " memory_size: int,\n", " batch_size: int,\n", " ou_noise_theta: float,\n", " ou_noise_sigma: float,\n", " gamma: float = 0.99,\n", " tau: float = 5e-3,\n", " initial_random_steps: int = 1e4,\n", " ):\n", " \"\"\"Initialize.\"\"\"\n", " obs_dim = env.observation_space.shape[0]\n", " action_dim = env.action_space.shape[0]\n", "\n", " self.env = env\n", " self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)\n", " self.batch_size = batch_size\n", " self.gamma = gamma\n", " self.tau = tau\n", " self.initial_random_steps = initial_random_steps\n", " \n", " # noise\n", " self.noise = OUNoise(\n", " action_dim,\n", " theta=ou_noise_theta,\n", " sigma=ou_noise_sigma,\n", " )\n", "\n", " # device: cpu / gpu\n", " self.device = torch.device(\n", " \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", " )\n", " print(self.device)\n", "\n", " # networks\n", " self.actor = Actor(obs_dim, action_dim).to(self.device)\n", " self.actor_target = Actor(obs_dim, action_dim).to(self.device)\n", " self.actor_target.load_state_dict(self.actor.state_dict())\n", " \n", " self.critic = Critic(obs_dim + action_dim).to(self.device)\n", " self.critic_target = Critic(obs_dim + action_dim).to(self.device)\n", " self.critic_target.load_state_dict(self.critic.state_dict())\n", "\n", " # optimizer\n", " self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)\n", " self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)\n", " \n", " # transition to store in memory\n", " self.transition = list()\n", " \n", " # total steps count\n", " self.total_step = 0\n", "\n", " # mode: train / test\n", " self.is_test = False\n", " \n", " def select_action(self, state: np.ndarray) -> np.ndarray:\n", " \"\"\"Select an action from the input state.\"\"\"\n", " # if initial random action should be conducted\n", " if self.total_step < self.initial_random_steps and not self.is_test:\n", " selected_action = self.env.action_space.sample()\n", " else:\n", " selected_action = self.actor(\n", " torch.FloatTensor(state).to(self.device)\n", " ).detach().cpu().numpy()\n", " \n", " # add noise for exploration during training\n", " if not self.is_test:\n", " noise = self.noise.sample()\n", " selected_action = np.clip(selected_action + noise, -1.0, 1.0)\n", " \n", " self.transition = [state, selected_action]\n", " \n", " return selected_action\n", " \n", " def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:\n", " \"\"\"Take an action and return the response of the env.\"\"\"\n", " next_state, reward, done, _ = self.env.step(action)\n", " \n", " if not self.is_test:\n", " self.transition += [reward, next_state, done]\n", " self.memory.store(*self.transition)\n", " \n", " return next_state, reward, done\n", " \n", " def update_model(self) -> torch.Tensor:\n", " \"\"\"Update the model by gradient descent.\"\"\"\n", " device = self.device # for shortening the following lines\n", " \n", " samples = self.memory.sample_batch()\n", " state = torch.FloatTensor(samples[\"obs\"]).to(device)\n", " next_state = torch.FloatTensor(samples[\"next_obs\"]).to(device)\n", " action = torch.FloatTensor(samples[\"acts\"].reshape(-1, 1)).to(device)\n", " reward = torch.FloatTensor(samples[\"rews\"].reshape(-1, 1)).to(device)\n", " done = torch.FloatTensor(samples[\"done\"].reshape(-1, 1)).to(device)\n", " \n", " masks = 1 - done\n", " next_action = self.actor_target(next_state)\n", " next_value = self.critic_target(next_state, next_action)\n", " curr_return = reward + self.gamma * next_value * masks\n", " \n", " # train critic\n", " values = self.critic(state, action)\n", " critic_loss = F.mse_loss(values, curr_return)\n", " \n", " self.critic_optimizer.zero_grad()\n", " critic_loss.backward()\n", " self.critic_optimizer.step()\n", " \n", " # train actor\n", " actor_loss = -self.critic(state, self.actor(state)).mean()\n", " \n", " self.actor_optimizer.zero_grad()\n", " actor_loss.backward()\n", " self.actor_optimizer.step()\n", " \n", " # target update\n", " self._target_soft_update()\n", " \n", " return actor_loss.data, critic_loss.data\n", " \n", " def train(self, num_frames: int, plotting_interval: int = 200):\n", " \"\"\"Train the agent.\"\"\"\n", " self.is_test = False\n", " \n", " state = self.env.reset()\n", " actor_losses = []\n", " critic_losses = []\n", " scores = []\n", " score = 0\n", " \n", " for self.total_step in range(1, num_frames + 1):\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", "\n", " # if episode ends\n", " if done: \n", " state = env.reset()\n", " scores.append(score)\n", " score = 0\n", "\n", " # if training is ready\n", " if (\n", " len(self.memory) >= self.batch_size \n", " and self.total_step > self.initial_random_steps\n", " ):\n", " actor_loss, critic_loss = self.update_model()\n", " actor_losses.append(actor_loss)\n", " critic_losses.append(critic_loss)\n", " \n", " # plotting\n", " if self.total_step % plotting_interval == 0:\n", " self._plot(\n", " self.total_step, \n", " scores, \n", " actor_losses, \n", " critic_losses,\n", " )\n", " \n", " self.env.close()\n", " \n", " def test(self):\n", " \"\"\"Test the agent.\"\"\"\n", " self.is_test = True\n", " \n", " state = self.env.reset()\n", " done = False\n", " score = 0\n", " \n", " frames = []\n", " while not done:\n", " frames.append(self.env.render(mode=\"rgb_array\"))\n", " action = self.select_action(state)\n", " next_state, reward, done = self.step(action)\n", "\n", " state = next_state\n", " score += reward\n", " \n", " print(\"score: \", score)\n", " self.env.close()\n", " \n", " return frames\n", " \n", " def _target_soft_update(self):\n", " \"\"\"Soft-update: target = tau*local + (1-tau)*target.\"\"\"\n", " tau = self.tau\n", " \n", " for t_param, l_param in zip(\n", " self.actor_target.parameters(), self.actor.parameters()\n", " ):\n", " t_param.data.copy_(tau * l_param.data + (1.0 - tau) * t_param.data)\n", " \n", " for t_param, l_param in zip(\n", " self.critic_target.parameters(), self.critic.parameters()\n", " ):\n", " t_param.data.copy_(tau * l_param.data + (1.0 - tau) * t_param.data)\n", " \n", " def _plot(\n", " self, \n", " frame_idx: int, \n", " scores: List[float], \n", " actor_losses: List[float], \n", " critic_losses: List[float], \n", " ):\n", " \"\"\"Plot the training progresses.\"\"\"\n", " def subplot(loc: int, title: str, values: List[float]):\n", " plt.subplot(loc)\n", " plt.title(title)\n", " plt.plot(values)\n", "\n", " subplot_params = [\n", " (131, f\"frame {frame_idx}. score: {np.mean(scores[-10:])}\", scores),\n", " (132, \"actor_loss\", actor_losses),\n", " (133, \"critic_loss\", critic_losses),\n", " ]\n", " \n", " clear_output(True)\n", " plt.figure(figsize=(30, 5))\n", " for loc, title, values in subplot_params:\n", " subplot(loc, title, values)\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Environment\n", "*ActionNormalizer* is an action wrapper class to normalize the action values ranged in (-1. 1). Thanks to this class, we can make the agent simply select action values within the zero centered range (-1, 1)." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "class ActionNormalizer(gym.ActionWrapper):\n", " \"\"\"Rescale and relocate the actions.\"\"\"\n", "\n", " def action(self, action: np.ndarray) -> np.ndarray:\n", " \"\"\"Change the range (-1, 1) to (low, high).\"\"\"\n", " low = self.action_space.low\n", " high = self.action_space.high\n", "\n", " scale_factor = (high - low) / 2\n", " reloc_factor = high - scale_factor\n", "\n", " action = action * scale_factor + reloc_factor\n", " action = np.clip(action, low, high)\n", "\n", " return action\n", "\n", " def reverse_action(self, action: np.ndarray) -> np.ndarray:\n", " \"\"\"Change the range (low, high) to (-1, 1).\"\"\"\n", " low = self.action_space.low\n", " high = self.action_space.high\n", "\n", " scale_factor = (high - low) / 2\n", " reloc_factor = high - scale_factor\n", "\n", " action = (action - reloc_factor) / scale_factor\n", " action = np.clip(action, -1.0, 1.0)\n", "\n", " return action" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can see [the code](https://github.com/openai/gym/blob/master/gym/envs/classic_control/pendulum.py) and [configurations](https://github.com/openai/gym/blob/cedecb35e3428985fd4efad738befeb75b9077f1/gym/envs/__init__.py#L81) of Pendulum-v0 from OpenAI's repository." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# environment\n", "env_id = \"Pendulum-v0\"\n", "env = gym.make(env_id)\n", "env = ActionNormalizer(env)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set random seed" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[777]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def seed_torch(seed):\n", " torch.manual_seed(seed)\n", " if torch.backends.cudnn.enabled:\n", " torch.backends.cudnn.benchmark = False\n", " torch.backends.cudnn.deterministic = True\n", "\n", "seed = 777\n", "random.seed(seed)\n", "np.random.seed(seed)\n", "seed_torch(seed)\n", "env.seed(seed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "cuda\n" ] } ], "source": [ "# parameters\n", "num_frames = 50000\n", "memory_size = 100000\n", "batch_size = 128\n", "ou_noise_theta = 1.0\n", "ou_noise_sigma = 0.1\n", "initial_random_steps = 10000\n", "\n", "agent = DDPGAgent(\n", " env, \n", " memory_size, \n", " batch_size,\n", " ou_noise_theta,\n", " ou_noise_sigma,\n", " initial_random_steps=initial_random_steps\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "agent.train(num_frames)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test\n", "Run the trained agent (1 episode)." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "score: -241.1583482566556\n" ] } ], "source": [ "# test\n", "if IN_COLAB:\n", " agent.env = gym.wrappers.Monitor(agent.env, \"videos\", force=True)\n", "frames = agent.test()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Render" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", " \n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Once \n", " Loop \n", " Reflect \n", "
\n", "
\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "if IN_COLAB: # for colab\n", " import base64\n", " import glob\n", " import io\n", " import os\n", "\n", " from IPython.display import HTML, display\n", "\n", " def ipython_show_video(path: str) -> None:\n", " \"\"\"Show a video at `path` within IPython Notebook.\"\"\"\n", " if not os.path.isfile(path):\n", " raise NameError(\"Cannot access: {}\".format(path))\n", "\n", " video = io.open(path, \"r+b\").read()\n", " encoded = base64.b64encode(video)\n", "\n", " display(HTML(\n", " data=\"\"\"\n", " \n", " \"\"\".format(encoded.decode(\"ascii\"))\n", " ))\n", "\n", " list_of_files = glob.glob(\"videos/*.mp4\")\n", " latest_file = max(list_of_files, key=os.path.getctime)\n", " print(latest_file)\n", " ipython_show_video(latest_file)\n", "\n", "else: # for jupyter\n", " from matplotlib import animation\n", " from JSAnimation.IPython_display import display_animation\n", " from IPython.display import display\n", "\n", "\n", " def display_frames_as_gif(frames):\n", " \"\"\"Displays a list of frames as a gif, with controls.\"\"\"\n", " patch = plt.imshow(frames[0])\n", " plt.axis('off')\n", "\n", " def animate(i):\n", " patch.set_data(frames[i])\n", "\n", " anim = animation.FuncAnimation(\n", " plt.gcf(), animate, frames = len(frames), interval=50\n", " )\n", " display(display_animation(anim, default_mode='loop'))\n", "\n", "\n", " # display \n", " display_frames_as_gif(frames)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "pgayn", "language": "python", "name": "pgayn" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 4 }