{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "%config IPCompleter.greedy = True\n", "%config InlineBackend.figure_format = 'retina'\n", "%matplotlib inline\n", "%load_ext tensorboard\n", "\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import pandas as pd\n", "import seaborn as sn\n", "import tensorflow as tf\n", "import os\n", "from datetime import datetime\n", "import tensorflow as tf\n", "import sklearn\n", "import random\n", "import operator\n", "\n", "pd.set_option('mode.chained_assignment', None)\n", "sn.set(rc={'figure.figsize':(9,9)})\n", "sn.set(font_scale=1.4)\n", "\n", "# make results reproducible\n", "seed = 0\n", "np.random.seed(seed)\n", "tf.random.set_seed(13)" ] }, { "attachments": { "agent2.png": { "image/png": "" } }, "cell_type": "markdown", "metadata": {}, "source": [ "# Reinforcement Learning\n", "\n", "Reinforcement learning (RL) is an area of machine learning concerned with how software agents ought to take actions in an environment in order to maximize the notion of cumulative reward. Reinforcement learning is one of three basic machine learning paradigms, alongside supervised learning and unsupervised learning.\n", "\n", "Reinforcement learning differs from supervised learning in not needing labelled input/output pairs be presented, and learning to act under evaluative feedback (rewards). Instead the focus is on finding a balance between exploration (of uncharted territory) and exploitation (of current knowledge).\n", "\n", "The environment is typically stated in the form of a Markov decision process (MDP), because many reinforcement learning algorithms for this context utilize dynamic programming techniques. The main difference between the classical dynamic programming methods and reinforcement learning algorithms (that involve ANNs) is that the latter do not assume knowledge of an exact mathematical model of the MDP and they target large MDPs where exact methods become infeasible.\n", "\n", "---\n", "\n", "Reinforcement learning is *Agent-oriented learning*, that is learning by interacting with an environment to achieve a goal. Which is learning by trial and error, with only delayed evaluative feedback (reward).\n", "\n", "However RL does have some challenges:\n", "* We only have a reward signal as feedback\n", "* Feedback is often delayed\n", "* Time matters, sequential and non-stationary data\n", "* Data received is affected by the agents previous interactions\n", "\n", "It has been applied successfully to various problems, including robot control, power control, telecommunications, backgammon, checkers and Go.\n", "\n", "## Problem formulation as a Markov Decision Process (MDP)\n", "\n", "Often we consider the problem of making a sequence of good decisions. That is in a discrete setting, an agent will make sequence of **actions** $\\{a_t\\}$, observe a sequence of **observations** $\\{o_t\\}$ and receive a sequence of **rewards** $\\{r_t\\}$. We define the **history** at time $t$ to be $h_t = (a_1, o_1, r_1, \\dots, a_t, o_t, r_t)$. The agent's function of the history, that is, $a_{t+1} = f(h_t)$, and the problem of sequential decision making can be thought of as defining and computing the function $f$ appropriately.\n", "\n", "![agent2.png](attachment:agent2.png)\n", "\n", "\n", "At each step $t$ the agent (as illustrated above) executes action $a_t$, receives observation $o_t$ and receives a scalar reward $r_t$. The environment receives action $a_t$, emits observation $o_{t+1}$ and emits a scalar reward $r_{t+1}$.\n", "\n", "Our world may be in a state of possible states ($S$), however the agent observes a sequence of states $\\{s_t\\}$. We often want to consider the **transition dynamics** of the world $P(s_{t+1}|s_t,a_t,\\dots,s_1,a_1)$, a probability distribution over $S$ conditioned on the previous states and actions. In RL, we often assume the **Markov property** that\n", "\n", "$$ P(s_{t+1}|s_t,a_t,\\dots,s_1,a_1)=P(s_{t+1}|s_t,a_t) $$\n", "\n", "We can use the trick to make sure the Markov property holds, by using the history $h_t$ as our state. Usually, we consider the reward $r_t$ to be received on the transition between states, $s_t \\xrightarrow{a_t} s_{t+1}$. A reward function is used to predict rewards, $R(s, a, s′) = \\mathop{\\mathbb{E}}[r_t|s_t = s, a_t = a, s_{t+1} = s′]$. We will often consider the reward function to be of the form $R(s) = \\mathop{\\mathbb{E}}[r_t|s_t = s]$ or $R(s, a) = \\mathop{\\mathbb{E}}[r_t|s_t = s, a_t = a]$. A **model** consists of the above transition dynamics and reward function.\n", "\n", "The **agent state** is a function of the history, $s^a_t = g(h_t)$, a RL agent typically has an explicit representation of one or more of the following three things, a policy, a value function and optionally a model. A **policy** $\\pi$ is a mapping from the agent state to an action, $\\pi(s^a_t)\\in A$,\n", "or, sometimes it is a stochastic distribution over actions $\\pi(a_t|s^a_t)$. When the agent wants to take an action and $\\pi$ is stochastic, it picks action $a \\in A$ with probability $P(a_t = a) = \\pi(a|s^a_t)$. Given a policy $\\pi$ and discount factor $\\gamma \\in [0, 1]$, a **value function** $V^{\\pi}$ is an expected sum of discounted rewards (e.g. how good is each state, can also include action),\n", "\n", "\n", "$$ V^\\pi(s) = \\mathop{\\mathbb{E}_\\pi}[r_t + \\gamma r_{t+1} + \\gamma^2 r_{t+2} + \\dots | s_t = s]$$\n", "\n", "\n", "\n", "\n", "Where $\\mathop{\\mathbb{E}_\\pi}$ denotes that the expectation is taken over states encountered by following policy $\\pi$, and, the discount factor $\\gamma$ is used to weigh immediate rewards versus delayed rewards. Lastly, if the agent has a model, we would call it a **model-based** agent, and if it doesn't incorporate a model (agent's representation of the environment), we would call it a **model-free** agent.\n", "\n", "For the case where $o_t\\neq s_t$ **partially observable**, and it is common in partially observable settings for RL algorithms to maintain a probability distribution over the true world state to define $s^a_t$ , which is known as a **belief state**. However for majority of cases we will consider the fully observable case, where $o_t = s_t$, and will assume that $s^a_t = s_t$. [[1](https://web.stanford.edu/class/cs234/slides/lnotes_intro.pdf)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Markov Decision Process (MDP)\n", "\n", "Here a Markov decision process (MDP) is a discrete time stochastic control process. It provides a mathematical framework for modeling decision making in situations where outcomes are partly random and partly under the control of a decision maker. It is fully defined by the tuple of $(S, A, P, R, \\gamma)$:\n", "* $S$ Set of all possible states\n", "* $A$ Set of all possible actions\n", "* $R$ distribution of reward given (state, action) pair\n", "* $P$ transition probability (of the world) i.e. distribution over next state given (state, action) pair\n", "* $\\gamma$ discount factor\n", "\n", "The core problem of MDPs is to find a **policy** for the agent: a function $\\pi$ that specifies the action $\\pi(s)$ that the agent will choose when in state $s$. Once a Markov decision process is combined with a policy in this way, this fixes the action for each state and the resulting combination behaves like a Markov chain (since the action chosen in state $s$ is completely determined by $\\pi(s)$ and $\\Pr(s_{t+1}=s' \\mid s_t = s, a_t=a)$ reduces to $\\Pr(s_{t+1}=s' \\mid s_t = s)$, a Markov transition matrix).\n", "\n", "The goal is to choose a policy $\\pi$ that will maximize some cumulative function of the random rewards, typically the expected discounted sum over a potentially infinite horizon. Hence the optimal policy $\\pi^*$ is given by:\n", "\n", "$$\\pi^* = \\operatorname{arg max}_{\\pi} \\mathop{\\mathbb{E}}[\\sum^{\\infty}_{t=0} {\\gamma^t R (s_t, a_t, s_{t+1})}] $$\n", "\n", "$$ V^*(s) = \\operatorname{arg max}_\\pi V_{\\pi^*}(s)$$\n", "\n", "(where we choose $a_t = \\pi(s_t)$, i.e. actions given by the policy). And the expectation is taken over $s_{t+1} \\sim P_{a_t}(s_t,s_{t+1})$\n", "\n", "where $\\ \\gamma \\ $ is the discount factor satisfying $0 \\le\\ \\gamma\\ \\le\\ 1$, which is usually close to 1.\n", "\n", "Because of the Markov property, the optimal policy for this particular problem can indeed be written as a function of $s$ only, as assumed above.\n", "\n", "The discount-factor motivates the agent to favor taking actions early, rather not postpone them indefinitely.\n", "\n", "This optimal policy can be found through a variety of methods, like dynamic programming. Some solutions require knowledge of the state transition function $P$ (**model**) and the reward function $R$. Others can solve for the optimal policy of an MDP using experimentation alone. \n", "\n", "The standard family of algorithms to calculate this optimal policy requires storage for two arrays indexed by state: **value** $V$, which contains real values, and **policy** $\\pi$, which contains actions. At the end of the algorithm, $\\pi$ will contain the solution and $V(s)$ will contain the discounted sum of the rewards to be earned (on average) by following that solution from state $s$.\n", "\n", "The algorithm has two steps, (1) a value update and (2) a policy update, which are repeated in some order for all the states until no further changes take place. Both recursively update \n", "a new estimation of the optimal policy and state value using an older estimation of those values (where $s'=s_{t+1}$).\n", "\n", "$$V(s) := \\sum_{s'} P_{\\pi(s)} (s,s') \\left( R_{\\pi(s)} (s,s') + \\gamma V(s') \\right)$$\n", "\n", "$$\\pi(s) := \\operatorname{argmax}_a \\left\\{ \\sum_{s'} P(s' \\mid s, a) \\left( R(s'\\mid s,a) + \\gamma V(s') \\right) \\right\\} $$\n", "\n", "Their order depends on the variant of the algorithm; one can also do them for all states at once or state by state, and more often to some states than others. As long as no state is permanently excluded from either of the steps, the algorithm will eventually arrive at the correct solution. Some of variants are:\n", "\n", "## Value iteration\n", "\n", "In value iteration, which is also called backward induction,\n", "the $\\pi$ function is not used; instead, the value of $\\pi(s)$ is calculated within $V(s)$ whenever it is needed. Substituting the calculation of $\\pi(s)$ into the calculation of $V(s)$ gives the combined\n", "\n", "$$ V_{i+1}(s) := \\max_a \\left\\{ \\sum_{s'} P(s,a,s') \\left( R(s,a,s') + \\gamma V_i(s') \\right) \\right\\}, $$\n", "\n", "where $i$ is the iteration number. Value iteration starts at $i = 0$ and $V_0$ as a guess of the value function. It then iterates, repeatedly computing $V_{i+1}$ for all states $s$, until $V$ converges with the left-hand side equal to the right-hand side (which is the *Bellman equation* for this problem).\n", "\n", "## Policy iteration\n", "\n", "In policy iteration, step one is performed once, and then step two is repeated until it converges. Then step one is again performed once and so on.\n", "\n", "Instead of repeating step two to convergence, it may be formulated and solved as a set of linear equations. These equations are merely obtained by making $s = s'$ in the step two equation. Thus, repeating step two to convergence can be interpreted as solving the linear equations by Relaxation (iterative method).\n", "\n", "This variant has the advantage that there is a definite stopping condition: when the array $\\pi$ does not change in the course of applying step 1 to all states, the algorithm is completed.\n", "\n", "Policy iteration is usually slower than value iteration for a large number of possible states. It takes $O(n^3)$ time to solve for $n$ states, making impractical for large state spaces.\n", "\n", "\n", "Lets illustrate these with a MDP example, we can first create a MDP class\n", "\n", "# MDP example\n", "\n", "[[2](http://aima.cs.berkeley.edu/)]\n", "[[3](https://github.com/aimacode/aima-python/blob/master/mdp.ipynb)]" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "class BaseMDP:\n", " \"\"\"A Markov Decision Process, defined by an initial state, transition model,\n", " and reward function. We also keep track of a gamma value, for use by\n", " algorithms. The transition model is represented somewhat differently from\n", " the text. Instead of P(s' | s, a) being a probability number for each\n", " state/state/action triplet, we instead have T(s, a) return a\n", " list of (p, s') pairs. We also keep track of the possible states,\n", " terminal states, and actions for each state. \"\"\"\n", "\n", " def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9):\n", " if not (0 < gamma <= 1):\n", " raise ValueError(\"An MDP must have 0 < gamma <= 1\")\n", "\n", " # collect states from transitions table if not passed.\n", " self.states = states or self.get_states_from_transitions(transitions)\n", "\n", " self.init = init\n", "\n", " if isinstance(actlist, list):\n", " # if actlist is a list, all states have the same actions\n", " self.actlist = actlist\n", "\n", " elif isinstance(actlist, dict):\n", " # if actlist is a dict, different actions for each state\n", " self.actlist = actlist\n", "\n", " self.terminals = terminals\n", " self.transitions = transitions or {}\n", " if not self.transitions:\n", " print(\"Warning: Transition table is empty.\")\n", "\n", " self.gamma = gamma\n", "\n", " self.reward = reward or {s: 0 for s in self.states}\n", "\n", " # self.check_consistency()\n", "\n", " def R(self, state):\n", " \"\"\"Return a numeric reward for this state.\"\"\"\n", "\n", " return self.reward[state]\n", "\n", " def T(self, state, action):\n", " \"\"\"Transition model. From a state and an action, return a list\n", " of (probability, result-state) pairs.\"\"\"\n", "\n", " if not self.transitions:\n", " raise ValueError(\"Transition model is missing\")\n", " else:\n", " return self.transitions[state][action]\n", "\n", " def actions(self, state):\n", " \"\"\"Return a list of actions that can be performed in this state. By default, a\n", " fixed list of actions, except for terminal states. Override this\n", " method if you need to specialize by state.\"\"\"\n", "\n", " if state in self.terminals:\n", " return [None]\n", " else:\n", " return self.actlist\n", "\n", " def get_states_from_transitions(self, transitions):\n", " if isinstance(transitions, dict):\n", " s1 = set(transitions.keys())\n", " s2 = set(tr[1] for actions in transitions.values()\n", " for effects in actions.values()\n", " for tr in effects)\n", " return s1.union(s2)\n", " else:\n", " print('Could not retrieve states from transitions')\n", " return None\n", "\n", " def check_consistency(self):\n", "\n", " # check that all states in transitions are valid\n", " assert set(self.states) == self.get_states_from_transitions(self.transitions)\n", "\n", " # check that init is a valid state\n", " assert self.init in self.states\n", "\n", " # check reward for each state\n", " assert set(self.reward.keys()) == set(self.states)\n", "\n", " # check that all terminals are valid states\n", " assert all(t in self.states for t in self.terminals)\n", "\n", " # check that probability distributions for all actions sum to 1\n", " for s1, actions in self.transitions.items():\n", " for a in actions.keys():\n", " s = 0\n", " for o in actions[a]:\n", " s += o[0]\n", " assert abs(s - 1) < 0.001\n", "\n", " \n", "class MDP(BaseMDP):\n", " def __init__(self, init, terminals, transition_matrix, reward = None, gamma=.9):\n", " # All possible actions.\n", " actlist = []\n", " for state in transition_matrix.keys():\n", " actlist.extend(transition_matrix[state])\n", " actlist = list(set(actlist))\n", " BaseMDP.__init__(self, init, actlist, terminals, transition_matrix, reward, gamma=gamma)\n", "\n", " def T(self, state, action):\n", " if action is None:\n", " return [(0.0, state)]\n", " else: \n", " return self.transitions[state][action]" ] }, { "attachments": { "mdp-1.png": { "image/png": "" } }, "cell_type": "markdown", "metadata": {}, "source": [ "Lets implement the simple MDP in the image below. States A, B have actions X, Y available in them. Their probabilities are shown just above the arrows. We start with using BaseMDP as base class for our CustomMDP. Obviously we need to make a few changes to suit our case. We make use of a transition matrix as our transitions are not very simple.\n", "\n", "![mdp-1.png](attachment:mdp-1.png)\n", "[[4](https://github.com/aimacode/aima-python/blob/master/mdp.ipynb)]" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Transition Matrix as nested dict. State -> Actions in state -> List of (Probability, State) tuples\n", "t = {\n", " \"A\": {\n", " \"X\": [(0.3, \"A\"), (0.7, \"B\")],\n", " \"Y\": [(1.0, \"A\")]\n", " },\n", " \"B\": {\n", " \"X\": {(0.8, \"End\"), (0.2, \"B\")},\n", " \"Y\": {(1.0, \"A\")}\n", " },\n", " \"End\": {}\n", "}\n", "\n", "init = \"A\"\n", "\n", "terminals = [\"End\"]\n", "\n", "rewards = {\n", " \"A\": 5,\n", " \"B\": -10,\n", " \"End\": 100\n", "}\n", "\n", "mdp_1 = MDP(init, terminals, t, rewards, gamma=.9)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Value Iteration" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "def value_iteration(mdp, epsilon=0.001):\n", " \"\"\"Solving an MDP by value iteration\"\"\"\n", " history = []\n", " V1 = {s: 0 for s in mdp.states}\n", " R, T, gamma = mdp.R, mdp.T, mdp.gamma\n", " while True:\n", " V = V1.copy()\n", " history.append(V)\n", " delta = 0\n", " for s in mdp.states:\n", " V1[s] = R(s) + gamma * max(sum(p * V[s1] for (p, s1) in T(s, a))\n", " for a in mdp.actions(s))\n", " delta = max(delta, abs(V1[s] - V[s]))\n", " if delta <= epsilon * (1 - gamma) / gamma:\n", " return V, pd.DataFrame(history)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'A': 72.10147625879485, 'B': 75.60975599852492, 'End': 100.0}\n" ] } ], "source": [ "V, history = value_iteration(mdp_1)\n", "print(V)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "image/png": { "height": 546, "width": 571 } }, "output_type": "display_data" } ], "source": [ "history.plot()\n", "plt.xlabel('Iterations')\n", "plt.ylabel('State Value')\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Policy Iteration" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def expected_value(a, s, V, mdp):\n", " \"\"\"The expected value of doing a in state s, according to the MDP and V.\"\"\"\n", "\n", " return sum(p * V[s1] for (p, s1) in mdp.T(s, a))\n", "\n", "def policy_evaluation(pi, V, mdp, k=20):\n", " \"\"\"Return an updated value mapping V from each state in the MDP to its\n", " value, using an approximation (modified policy iteration).\"\"\"\n", "\n", " R, T, gamma = mdp.R, mdp.T, mdp.gamma\n", " for i in range(k):\n", " for s in mdp.states:\n", " V[s] = R(s) + gamma * sum(p * V[s1] for (p, s1) in T(s, pi[s]))\n", " return V\n", "\n", "def policy_iteration(mdp):\n", " \"\"\"Solve an MDP by policy iteration\"\"\"\n", "\n", " history = []\n", " V = {s: 0 for s in mdp.states}\n", " pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}\n", " while True:\n", " V = policy_evaluation(pi, V, mdp)\n", " history.append(V)\n", " unchanged = True\n", " for s in mdp.states:\n", " a = max(mdp.actions(s), key=lambda a: expected_value(a, s, V, mdp))\n", " if a != pi[s]:\n", " pi[s] = a\n", " unchanged = False\n", " if unchanged:\n", " return pi, V, pd.DataFrame(history)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Value Function: \n", " {'A': 72.10157032031375, 'B': 75.60975609756038, 'End': 100.0}\n", "\n", "Policy: \n", " {'A': 'X', 'B': 'X', 'End': None}\n" ] } ], "source": [ "pi, V, history = policy_iteration(mdp_1)\n", "print('Value Function: \\n', V)\n", "print('\\nPolicy: \\n', pi)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Reinforcement Learning" ] }, { "attachments": { "grid_mdp.jpg": { "image/jpeg": "" }, "grid_mdp_agent.jpg": { "image/jpeg": "" } }, "cell_type": "markdown", "metadata": {}, "source": [ "## Passive Reinforcement Learning\n", "\n", "For Passive Reinforcement Learning the agent follows a fixed policy $\\pi$. It attempts to evaluate a given policy $\\pi$ - without any knowledge of the Reward function $R(s)$ and the Transition model $P(s_{t+1}|s_t,a_t)$.\n", "\n", "This is achieved by **value estimation**, where the agent attempts to directly learn the value of each state that would result from following the policy. Although at each step, it has to *perceive* the reward and the state - it has no global knowledge of these. Thus, if a certain set of actions offers a very low probability of attaining some state $s_+$ - the agent may never perceive the reward $R(s_+)$.\n", "\n", "For a series of actions given by $\\pi$, the estimated value $V$:\n", "\n", "$$V^{\\pi}(s) = \\mathop{\\mathbb{E}_\\pi}[\\sum_{t=0}^\\infty \\gamma^t R^t(s')]$$\n", "\n", "Or the expected value of summed discounted rewards until termination.\n", "\n", "Based on this concept, we discuss three methods of estimating the value:\n", "\n", "1. **Direct Value Estimation (DVE)**\n", " \n", " The first, most naive method of estimating value comes from the simplest interpretation of the above definition. We construct an agent that follows the policy until it reaches the terminal state. At each step, it logs its current state, reward. Once it reaches the terminal state, it can estimate the value for each state for *that* iteration, by simply summing the discounted rewards from that state to the terminal one.\n", "\n", " It can now run this *simulation* $n$ times, and calculate the average value of each state. If a state occurs more than once in a simulation, both its value values are counted separately.\n", " \n", " Note that this method may be prohibitively slow for very large state spaces. Besides, **it pays no attention to the transition probability model $P(s_{t+1}|s_t,a_t)$.** It misses out on information that it is capable of collecting (say, by recording the number of times an action from one state led to another state). The next method addresses this issue.\n", " \n", "2. **Adaptive Dynamic Programming (ADP)**\n", " \n", " This method makes use of knowledge of the past state $s_t$, the action $a_t$, and the new perceived state $s_{t+1}$ to estimate the transition probability model $P(s_{t+1}|s_t,a_t)$. It does this by the simple counting of new states resulting from previous states and actions.\n", " The program runs through the policy a number of times, keeping track of:\n", " * each occurrence of state $s_t$ and the policy-recommended action $a_t$ in $N_{s_t, a_t}$ (table of frequencies for state-action pairs, initially zero)\n", " * each occurrence of $s_{t+1}$ resulting from $a_t$ on $s_t$ in $N_{s_{t+1}|s_t, a_t}$. (table of outcome frequencies given state-action pairs, initially zero)\n", " \n", " It can thus estimate $P(s_{t+1}|s_t,a_t)$ as $N_{s_{t+1}|s_t, a_t}/N_{s_t, a_t}$, which in the limit of infinite trials, will converge to the true value.\n", " Using the transition probabilities thus estimated, it can apply **Policy Evaluation** to estimate the values $V(s)$ using properties of convergence of the Bellman functions.\n", "\n", "3. **Temporal-difference learning (TD)**\n", " \n", " Instead of explicitly building the transition model $P$, the temporal-difference model makes use of the expected closeness between the values of two consecutive states $s_{t}$ and $s_{t+1}$.\n", " For the transition $s_t$ to $s_{t+1}$, the update is written as:\n", " \n", "$$V^{\\pi}(s) \\leftarrow V^{\\pi}(s) + \\alpha \\left( R(s) + \\gamma V^{\\pi}(s') - V^{\\pi}(s) \\right)$$\n", " This model implicitly incorporates the transition probabilities by being weighed for each state by the number of times it is achieved from the current state. Thus, over a number of iterations, it converges similarly to the Bellman equations.\n", " The advantage of the TD learning model is its relatively simple computation at each step, rather than having to keep track of various counts.\n", " For $n_s$ states and $n_a$ actions the ADP model would have $n_s \\times n_a$ numbers $N_{s_t, a_t}$ and $n_s^2 \\times n_a$ numbers $N_{s_{t+1}|s_t, a_t}$ to keep track of. The TD model must only keep track of a value $V(s)$ for each state.\n", " \n", "## Example\n", "\n", "Lets create a new MDP class to represent a grid world MDP. Here assuming that the environment is fully observable, so that the agent always knows where it is. The rewards are **+1** and **-1** in the terminal states, and **-0.04** in the rest (to help learn the shortest route).\n", "\n", "![grid_mdp.jpg](attachment:grid_mdp.jpg)\n", "[[5](https://github.com/aimacode/aima-python/blob/master/mdp.ipynb)]\n", "\n", "> This is the environment for our agent.\n", "\n", "We also assume that the transitions are **Markovian**, that is, the probability of reaching state $s_{t+1}$ from state $s_t$ depends only on $s_t$ and not on the history of earlier states.\n", "Almost all stochastic decision problems can be reframed as a Markov Decision Process just by tweaking the definition of a state for that particular problem.\n", "\n", "However, the actions of our agent in this environment are unreliable. In other words, the motion of our agent is stochastic. \n", "\n", "More specifically, the agent may:\n", "* move correctly in the intended direction with a probability of $0.8$ \n", "* move $90^\\circ$ to the right of the intended direction with a probability $0.1$\n", "* move $90^\\circ$ to the left of the intended direction with a probability $0.1$\n", "\n", "The agent stays put if it bumps into a wall.\n", "\n", "![grid_mdp_agent.jpg](attachment:grid_mdp_agent.jpg)\n", "[[6](https://github.com/aimacode/aima-python/blob/master/mdp.ipynb)]\n", "\n", "For any given state, the actions the agent can take are encoded as given below:\n", "- Move Up: (0, 1)\n", "- Move Down: (0, -1)\n", "- Move Left: (-1, 0)\n", "- Move Right: (1, 0)\n", "- Do nothing: `None`\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "orientations = EAST, NORTH, WEST, SOUTH = [(1, 0), (0, 1), (-1, 0), (0, -1)]\n", "turns = LEFT, RIGHT = (+1, -1)\n", "\n", "\n", "def turn_heading(heading, inc, headings=orientations):\n", " return headings[(headings.index(heading) + inc) % len(headings)]\n", "\n", "\n", "def turn_right(heading):\n", " return turn_heading(heading, RIGHT)\n", "\n", "\n", "def turn_left(heading):\n", " return turn_heading(heading, LEFT)\n", "\n", "def vector_add(a, b):\n", " \"\"\"Component-wise addition of two vectors.\"\"\"\n", " return tuple(map(operator.add, a, b))\n", "\n", "class GridMDP(BaseMDP):\n", " \"\"\"A two-dimensional grid MDP. All you have to do is\n", " specify the grid as a list of lists of rewards; use None for an obstacle\n", " (unreachable state). Also, you should specify the terminal states.\n", " An action is an (x, y) unit vector; e.g. (1, 0) means move east.\"\"\"\n", "\n", " def __init__(self, grid, terminals, init=(0, 0), gamma=.9):\n", " grid.reverse() # because we want row 0 on bottom, not on top\n", " reward = {}\n", " states = set()\n", " self.rows = len(grid)\n", " self.cols = len(grid[0])\n", " self.grid = grid\n", " for x in range(self.cols):\n", " for y in range(self.rows):\n", " if grid[y][x]:\n", " states.add((x, y))\n", " reward[(x, y)] = grid[y][x]\n", " self.states = states\n", " actlist = orientations\n", " transitions = {}\n", " for s in states:\n", " transitions[s] = {}\n", " for a in actlist:\n", " transitions[s][a] = self.calculate_T(s, a)\n", " BaseMDP.__init__(self, init, actlist=actlist,\n", " terminals=terminals, transitions=transitions,\n", " reward=reward, states=states, gamma=gamma)\n", "\n", " def calculate_T(self, state, action):\n", " if action:\n", " return [(0.8, self.go(state, action)),\n", " (0.1, self.go(state, turn_right(action))),\n", " (0.1, self.go(state, turn_left(action)))]\n", " else:\n", " return [(0.0, state)]\n", "\n", " def T(self, state, action):\n", " return self.transitions[state][action] if action else [(0.0, state)]\n", "\n", " def go(self, state, direction):\n", " \"\"\"Return the state that results from going in this direction.\"\"\"\n", "\n", " state1 = vector_add(state, direction)\n", " return state1 if state1 in self.states else state\n", "\n", " def to_grid(self, mapping):\n", " \"\"\"Convert a mapping from (x, y) to v into a [[..., v, ...]] grid.\"\"\"\n", "\n", " return list(reversed([[mapping.get((x, y), None)\n", " for x in range(self.cols)]\n", " for y in range(self.rows)]))\n", "\n", " def to_arrows(self, policy):\n", " chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}\n", " return self.to_grid({s: chars[a] for (s, a) in policy.items()})\n", "\n", "sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1],\n", " [-0.04, None, -0.04, -1],\n", " [-0.04, -0.04, -0.04, -0.04]],\n", " terminals=[(3, 2), (3, 1)])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Direct Value Estimation (DVE)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Calculated Values are:\n", "\n", "(0, 1):0.8223110453090214\n", "(1, 2):0.9145057678057944\n", "(3, 2):1.0\n", "(0, 0):0.7679737120225807\n", "(2, 2):0.9574995732260843\n", "(1, 0):0.6952294921875\n", "(0, 2):0.874002043431046\n", "(2, 1):-0.06218255360921221\n", "(3, 1):-1.0\n" ] } ], "source": [ "def run_single_trial(agent_program, mdp):\n", " \"\"\"Execute trial for given agent_program\n", " and mdp. mdp should be an instance of subclass\n", " of mdp.MDP \"\"\"\n", "\n", " def take_single_action(mdp, s, a):\n", " \"\"\"\n", " Select outcome of taking action a\n", " in state s. Weighted Sampling.\n", " \"\"\"\n", " x = random.uniform(0, 1)\n", " cumulative_probability = 0.0\n", " for probability_state in mdp.T(s, a):\n", " probability, state = probability_state\n", " cumulative_probability += probability\n", " if x < cumulative_probability:\n", " break\n", " return state\n", "\n", " current_state = mdp.init\n", " while True:\n", " current_reward = mdp.R(current_state)\n", " percept = (current_state, current_reward)\n", " next_action = agent_program(percept)\n", " if next_action is None:\n", " break\n", " current_state = take_single_action(mdp, current_state, next_action)\n", "\n", "class PassiveDVEAgent:\n", " \"\"\"\n", " Passive (non-learning) agent that uses direct value estimation\n", " on a given MDP and policy.\n", " \"\"\"\n", "\n", " def __init__(self, pi, mdp):\n", " self.pi = pi\n", " self.mdp = mdp\n", " self.V = {}\n", " self.s = None\n", " self.a = None\n", " self.s_history = []\n", " self.r_history = []\n", " self.init = mdp.init\n", "\n", " def __call__(self, percept):\n", " s1, r1 = percept\n", " self.s_history.append(s1)\n", " self.r_history.append(r1)\n", " if s1 in self.mdp.terminals:\n", " self.s = self.a = None\n", " else:\n", " self.s, self.a = s1, self.pi[s1]\n", " return self.a\n", "\n", " def estimate_V(self):\n", " # this function can be called only if the MDP has reached a terminal state\n", " # it will also reset the mdp history\n", " assert self.a is None, 'MDP is not in terminal state'\n", " assert len(self.s_history) == len(self.r_history)\n", " # calculating the utilities based on the current iteration\n", " V2 = {s: [] for s in set(self.s_history)}\n", " for i in range(len(self.s_history)):\n", " s = self.s_history[i]\n", " V2[s] += [sum(self.r_history[i:])]\n", " V2 = {k: sum(v) / max(len(v), 1) for k, v in V2.items()}\n", " # resetting history\n", " self.s_history, self.r_history = [], []\n", " # setting the new utilities to the average of the previous \n", " # iteration and this one\n", " for k in V2.keys():\n", " if k in self.V.keys():\n", " self.V[k] = (self.V[k] + V2[k]) / 2\n", " else:\n", " self.V[k] = V2[k]\n", " return self.V\n", "\n", " def update_state(self, percept):\n", " \"\"\"To be overridden in most cases. The default case\n", " assumes the percept to be of type (state, reward)\"\"\"\n", " return percept\n", "\n", "# We need instantiate the DVE with a policy\n", "policy = {\n", " (0, 2): EAST, (1, 2): EAST, (2, 2): EAST, (3, 2): None,\n", " (0, 1): NORTH, (2, 1): NORTH, (3, 1): None,\n", " (0, 0): NORTH, (1, 0): WEST, (2, 0): WEST, (3, 0): WEST, \n", "} \n", "\n", "DVEagent = PassiveDVEAgent(policy, sequential_decision_environment)\n", "for i in range(200):\n", " run_single_trial(DVEagent, sequential_decision_environment)\n", " DVEagent.estimate_V()\n", " \n", "print('Calculated Values are:\\n')\n", "print('\\n'.join([str(k)+':'+str(v) for k, v in DVEagent.V.items()]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Adaptive Dynamic Programming Agent\n", "\n", "`PassiveADPAgent` uses state transition and occurrence counts to estimate $P$, and then $V$." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Warning: Transition table is empty.\n", "\n", "Calculated Values are:\n", "\n", "(0, 0):0.2976135513656844\n", "(0, 1):0.4036610839314542\n", "(1, 2):0.6521588438729153\n", "(3, 2):1.0\n", "(3, 0):0.0\n", "(3, 1):-1.0\n", "(2, 1):0.5396460099257762\n", "(2, 0):0.0\n", "(2, 2):0.8006301473972047\n", "(1, 0):0.20515874335052375\n", "(0, 2):0.5175335514814762\n" ] } ], "source": [ "from collections import defaultdict\n", "\n", "class PassiveADPAgent:\n", " \"\"\"\n", " Passive (non-learning) agent that uses adaptive dynamic programming\n", " on a given MDP and policy.\n", " \"\"\"\n", "\n", " class ModelMDP(BaseMDP):\n", " \"\"\"Class for implementing modified Version of input MDP with\n", " an editable transition model P and a custom function T.\"\"\"\n", "\n", " def __init__(self, init, actlist, terminals, gamma, states):\n", " super().__init__(init, actlist, terminals, states=states, gamma=gamma)\n", " nested_dict = lambda: defaultdict(nested_dict)\n", " self.P = nested_dict()\n", "\n", " def T(self, s, a):\n", " \"\"\"Return a list of tuples with probabilities for states\n", " based on the learnt model P.\"\"\"\n", " return [(prob, res) for (res, prob) in self.P[(s, a)].items()]\n", "\n", " def __init__(self, pi, mdp):\n", " self.pi = pi\n", " self.mdp = PassiveADPAgent.ModelMDP(mdp.init, mdp.actlist,\n", " mdp.terminals, mdp.gamma, mdp.states)\n", " self.V = {}\n", " self.Nsa = defaultdict(int)\n", " self.Ns1_sa = defaultdict(int)\n", " self.s = None\n", " self.a = None\n", " self.visited = set() # keeping track of visited states\n", "\n", " def __call__(self, percept):\n", " s1, r1 = percept\n", " mdp = self.mdp\n", " R, P, terminals, pi = mdp.reward, mdp.P, mdp.terminals, self.pi\n", " s, a, Nsa, Ns1_sa, V = self.s, self.a, self.Nsa, self.Ns1_sa, self.V\n", "\n", " if s1 not in self.visited: # Reward is only known for visited state.\n", " V[s1] = R[s1] = r1\n", " self.visited.add(s1)\n", " if s is not None:\n", " Nsa[(s, a)] += 1\n", " Ns1_sa[(s1, s, a)] += 1\n", " # for each t such that Ns′|sa [t, s, a] is nonzero\n", " for t in [res for (res, state, act), freq in Ns1_sa.items()\n", " if (state, act) == (s, a) and freq != 0]:\n", " P[(s, a)][t] = Ns1_sa[(t, s, a)] / Nsa[(s, a)]\n", "\n", " self.V = policy_evaluation(pi, V, mdp)\n", " self.Nsa, self.Ns1_sa = Nsa, Ns1_sa\n", " if s1 in terminals:\n", " self.s = self.a = None\n", " else:\n", " self.s, self.a = s1, self.pi[s1]\n", " return self.a\n", "\n", " def update_state(self, percept):\n", " \"\"\"To be overridden in most cases. The default case\n", " assumes the percept to be of type (state, reward).\"\"\"\n", " return percept\n", "\n", "ADPagent = PassiveADPAgent(policy, sequential_decision_environment)\n", "for i in range(200):\n", " run_single_trial(ADPagent, sequential_decision_environment)\n", " \n", "print('\\nCalculated Values are:\\n')\n", "print('\\n'.join([str(k)+':'+str(v) for k, v in ADPagent.V.items()]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Passive Temporal Difference Agent\n", "\n", "`PassiveTDAgent` uses temporal differences to learn value estimates. We learn the difference between the states and backup the values to previous states." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "Calculated Values are:\n", "\n", "(0, 1):0.3271371707951126\n", "(1, 2):0.5591257360823622\n", "(3, 2):1\n", "(0, 0):0.2932197194224822\n", "(3, 0):0.0\n", "(3, 1):-1\n", "(2, 1):0.3377577562939222\n", "(2, 0):0.0\n", "(2, 2):0.7359514729008128\n", "(1, 0):0.19716297821904394\n", "(0, 2):0.41234081001453976\n" ] } ], "source": [ "class PassiveTDAgent:\n", " \"\"\"\n", " The abstract class for a Passive (non-learning) agent that uses\n", " temporal differences to learn value estimates. Override update_state\n", " method to convert percept to state and reward. The mdp being provided\n", " should be an instance of a subclass of the MDP Class.\n", " \"\"\"\n", "\n", " def __init__(self, pi, mdp, alpha=None):\n", "\n", " self.pi = pi\n", " self.V = {s: 0. for s in mdp.states}\n", " self.Ns = {s: 0 for s in mdp.states}\n", " self.s = None\n", " self.a = None\n", " self.r = None\n", " self.gamma = mdp.gamma\n", " self.terminals = mdp.terminals\n", "\n", " if alpha:\n", " self.alpha = alpha\n", " else:\n", " self.alpha = lambda n: 1 / (1 + n)\n", "\n", " def __call__(self, percept):\n", " s1, r1 = self.update_state(percept)\n", " pi, V, Ns, s, r = self.pi, self.V, self.Ns, self.s, self.r\n", " alpha, gamma, terminals = self.alpha, self.gamma, self.terminals\n", " if not Ns[s1]:\n", " V[s1] = r1\n", " if s is not None:\n", " Ns[s] += 1\n", " V[s] += alpha(Ns[s]) * (r + gamma * V[s1] - V[s])\n", " if s1 in terminals:\n", " self.s = self.a = self.r = None\n", " else:\n", " self.s, self.a, self.r = s1, pi[s1], r1\n", " return self.a\n", "\n", " def update_state(self, percept):\n", " \"\"\"To be overridden in most cases. The default case\n", " assumes the percept to be of type (state, reward).\"\"\"\n", " return percept\n", " \n", "# Learning rate\n", "alpha = lambda n: 60./(59+n)\n", "\n", "TDagent = PassiveTDAgent(policy, sequential_decision_environment, alpha = alpha)\n", "for i in range(200):\n", " run_single_trial(TDagent,sequential_decision_environment)\n", " \n", "print('\\nCalculated Values are:\\n')\n", "print('\\n'.join([str(k)+':'+str(v) for k, v in TDagent.V.items()]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Active Reinforcement Learning\n", "\n", "Unlike Passive Reinforcement Learning in Active Reinforcement Learning we are not bound by a policy $\\pi$ and we need to select our actions. In other words the agent needs to learn an optimal policy $\\pi^*$. The fundamental tradeoff the agent needs to face is that of **exploration** vs. **exploitation**.\n", "\n", "## Q-Learning Agent\n", "\n", "In Q-Learning the agent learns an action-value function $Q(s_t,a_t)$ which gives the value of taking a given action in a particular state. $Q$-values are related directly to values as:\n", "\n", "$$V(s) = \\operatorname{max}_{a} Q(s, a)$$\n", "\n", "\n", "Q-Learning does not require a transition model and hence is a **model-free** method. As with values, we can derive the update equation from Bellman's equations, which is for TD (Temporal Difference) Q-Learning:\n", "\n", "$$ Q(s_t,a_t) \\leftarrow Q(s_t,a_t) + \\alpha(R(s_t) + \\gamma \\operatorname{max}_{a_{t+1}} Q(s_{t+1},a_{t+1}) - Q(s_t,a_t)) $$\n", "\n", "And is calculated whenever we take action $a_t$, in state $s_t$ leading to state $s_{t+1}$. When exploring happens Q-learning uses the best $Q$-value, hence it is called an **off-policy** algorithm as it does not follow a policy when exploring.\n", "\n", "The below `QLearningAgent` class implements an exploration function $f$ which returns fixed $R_+$ (an optomisitic estimate of the best possible reward attainable in any state) until the agent has visited state, action $N_e$ number of times. The method `QLearningAgent.actions_in_state` returns actions possible in given state. It is useful when applying max and argmax operations." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "class QLearningAgent:\n", " \"\"\"\n", " An exploratory Q-learning agent. It avoids having to learn the transition\n", " model because the Q-value of a state can be related directly to those of\n", " its neighbors.\n", " \"\"\"\n", "\n", " def __init__(self, mdp, Ne, Rplus, alpha=None):\n", "\n", " self.gamma = mdp.gamma\n", " self.terminals = mdp.terminals\n", " self.all_act = mdp.actlist\n", " self.Ne = Ne # iteration limit in exploration function\n", " self.Rplus = Rplus # large value to assign before iteration limit\n", " self.Q = defaultdict(float)\n", " self.Nsa = defaultdict(float)\n", " self.s = None\n", " self.a = None\n", " self.r = None\n", "\n", " if alpha:\n", " self.alpha = alpha\n", " else:\n", " self.alpha = lambda n: 1. / (1 + n)\n", "\n", " def f(self, u, n):\n", " \"\"\"Exploration function. Returns fixed Rplus until\n", " agent has visited state, action a Ne number of times.\n", " Same as ADP agent\"\"\"\n", " if n < self.Ne:\n", " return self.Rplus\n", " else:\n", " return u\n", "\n", " def actions_in_state(self, state):\n", " \"\"\"Return actions possible in given state.\n", " Useful for max and argmax.\"\"\"\n", " if state in self.terminals:\n", " return [None]\n", " else:\n", " return self.all_act\n", "\n", " def __call__(self, percept):\n", " s1, r1 = self.update_state(percept)\n", " Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r\n", " alpha, gamma, terminals = self.alpha, self.gamma, self.terminals,\n", " actions_in_state = self.actions_in_state\n", "\n", " if s in terminals:\n", " Q[s, None] = r1\n", " if s is not None:\n", " Nsa[s, a] += 1\n", " Q[s, a] += alpha(Nsa[s, a]) * (r + gamma * max(Q[s1, a1]\n", " for a1 in actions_in_state(s1)) - Q[s, a])\n", " if s in terminals:\n", " self.s = self.a = self.r = None\n", " else:\n", " self.s, self.r = s1, r1\n", " self.a = max(actions_in_state(s1), key=lambda a1: self.f(Q[s1, a1], Nsa[s1, a1]))\n", " return self.a\n", "\n", " def update_state(self, percept):\n", " \"\"\"To be overridden in most cases. The default case\n", " assumes the percept to be of type (state, reward).\"\"\"\n", " return percept\n", " \n", "q_agent = QLearningAgent(sequential_decision_environment, Ne=5, Rplus=2, \n", " alpha=lambda n: 60./(59+n))\n", "for i in range(200):\n", " run_single_trial(q_agent,sequential_decision_environment)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let us see the Q Values. The keys are state-action pairs. Where different actions correspond according to:\n", "\n", "* north = (0, 1)\n", "* south = (0,-1)\n", "* west = (-1, 0)\n", "* east = (1, 0)\n", "* Do nothing = None" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "defaultdict(float,\n", " {((0, 0), (1, 0)): -0.18907011024435563,\n", " ((0, 0), (0, 1)): 0.016055715142303573,\n", " ((0, 0), (-1, 0)): -0.19414582836715144,\n", " ((0, 0), (0, -1)): -0.197362264362732,\n", " ((1, 0), (1, 0)): -0.13799385740012343,\n", " ((1, 0), (0, 1)): -0.1955045464165534,\n", " ((1, 0), (-1, 0)): -0.19234368038524843,\n", " ((1, 0), (0, -1)): -0.2039606865759629,\n", " ((2, 0), (1, 0)): -0.18680269373594197,\n", " ((2, 0), (0, 1)): -0.10227424445451055,\n", " ((2, 0), (-1, 0)): -0.19209191880753967,\n", " ((2, 0), (0, -1)): -0.19412326272670127,\n", " ((3, 0), (1, 0)): -0.5138163544867891,\n", " ((3, 0), (0, 1)): -0.9104342940243259,\n", " ((3, 0), (-1, 0)): -0.799633788177492,\n", " ((3, 0), (0, -1)): -0.28011830872786025,\n", " ((3, 1), None): -0.5609399712951512,\n", " ((0, 1), (1, 0)): -0.16860689881582092,\n", " ((0, 1), (0, 1)): 0.03331943373550701,\n", " ((0, 1), (-1, 0)): -0.15340385689815017,\n", " ((0, 1), (0, -1)): -0.1555735366525523,\n", " ((0, 2), (1, 0)): 0.07948743858498493,\n", " ((0, 2), (0, 1)): -0.10635561770608827,\n", " ((0, 2), (-1, 0)): -0.10293706293706295,\n", " ((0, 2), (0, -1)): -0.12112719741045669,\n", " ((1, 2), (1, 0)): 0.17489854352873557,\n", " ((1, 2), (0, 1)): -0.04,\n", " ((1, 2), (-1, 0)): -0.0759999433406361,\n", " ((1, 2), (0, -1)): -0.0759999433406361,\n", " ((2, 2), (1, 0)): 0.2947289538798112,\n", " ((2, 2), (0, 1)): 0.0782019494899496,\n", " ((2, 2), (-1, 0)): 0.05370117293264979,\n", " ((2, 2), (0, -1)): -0.005197935584750485,\n", " ((3, 2), None): 0.3963241973633407,\n", " ((2, 1), (1, 0)): -0.9021402660633233,\n", " ((2, 1), (0, 1)): -0.2807786612937162,\n", " ((2, 1), (-1, 0)): -0.05810436751079326,\n", " ((2, 1), (0, -1)): -0.6123214831756907})" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "q_agent.Q" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The Value $V$ of each state is related to $Q$ by:\n", "\n", "$$V(s) = \\operatorname{arg max}_{a} Q(s, a)$$\n", "\n", "Let us convert the $Q$ Values above into $V$ estimates." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "defaultdict(()>,\n", " {(0, 0): 0.016055715142303573,\n", " (1, 0): -0.13799385740012343,\n", " (2, 0): -0.10227424445451055,\n", " (3, 0): -0.28011830872786025,\n", " (3, 1): -0.5609399712951512,\n", " (0, 1): 0.03331943373550701,\n", " (0, 2): 0.07948743858498493,\n", " (1, 2): 0.17489854352873557,\n", " (2, 2): 0.2947289538798112,\n", " (3, 2): 0.3963241973633407,\n", " (2, 1): -0.05810436751079326})" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "V = defaultdict(lambda: -1000.) # Very Large Negative Value for Comparison see below.\n", "for state_action, value in q_agent.Q.items():\n", " state, action = state_action\n", " if V[state] < value:\n", " V[state] = value\n", "V" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Generalization in Reinforcement Learning\n", "\n", "Presently we have looked at value functions and Q-functions over small state spaces, however these become computationally infeasible to compute for large state spaces (i.e. 100,000 states +). However practically in the real world, we have many more states. Or in the problem of games, these have many states as well, such as in chess there are $10^{120}$ states. We realistically cannot visit all these states in order to learn to to act optimally.\n", "\n", "We solve this using **function approximation**, where we learn a function to represent the Q-function or value function instead of a sampled look up table. This has the added benefit that through this compression it allows the agent to generalise from states it has not yet visited. For example by examining only one in every $10^{12}$ states in the game backgammon, it is possible to learn a value function that allows a program to play as well as any human.\n", "\n", "Looking at Value estimation, with function approximation this becomes a **supervised learning** problem. Suppose we look at the above grid MDP example, we could model the value estimation as:\n", "\n", "$$ \\hat{V}_\\theta (x,y) = f(x,y) = \\theta_0 + \\theta_1 x + \\theta_2 y $$\n", "\n", "Given a collection of trials, we can obtain samples for $\\hat{V}_\\theta (x,y)$, hence forming a readily solved supervised learning problem (Here illustrated with a toy linear regression formulation, however this could be a neural network). However we can benefit from *online learning*, updating the parameters after each trial. If $v(s)_j$ is the observed total reward from state $s$ onward in the $j$th trial, we can use the following parameter update equation:\n", "\n", "$$ \\theta \\leftarrow \\theta + \\alpha (v_j(s) - \\hat{V}_\\theta(s))\\frac{\\partial \\hat{V}_\\theta(s)}{\\partial \\theta}$$\n", "\n", "Also known as the **delta rule**. We can also apply these ideas to temporal difference learners as well, the new versions of TD:\n", "\n", "$$ \\theta \\leftarrow \\theta + \\alpha [R(s_t) + \\gamma \\hat{V}_\\theta(s_{t+1})-\\hat{V}_\\theta(s_t)]\\frac{\\partial \\hat{V}_\\theta(s_t)}{\\partial \\theta}$$\n", "\n", "and Q-learning:\n", "\n", "$$ \\theta \\leftarrow \\theta + \\alpha [R(s_t) + \\gamma \\operatorname{max}_{a_{t+1}} \\hat{Q}_\\theta(s_{t+1}, a_{t+1})-\\hat{Q}_\\theta(s_t, a_t)]\\frac{\\partial \\hat{Q}_\\theta(s_t, a_t)}{\\partial \\theta}$$\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Policy search\n", "\n", "A simple idea and approach to reinforcement learning, where we keep modifying the policy till we converge to a performance that is no longer increasing. We can parametrise a policy $\\pi$ (which maps states to actions), for example by a collection of parametrised Q-functions, i.e.\n", "\n", "$$ \\pi(s) = \\operatorname{max}_a \\hat{Q}_\\theta(s, a) $$\n", "\n", "In this example we would learn a Q-function that is close to the optimal $Q^*$. However this results in finding an optimal policy (an action sequence through the states), however we will not learn the optimal $Q^*$, and our learned Q-function could be a scale off the actual $Q*$ (e.g. $\\hat{Q}_\\theta(s, a) = Q^*(s,a)/20$). One problem learning a policy is that a policy function is a *discontinuous* function on the input, making it difficult to differentiate. This can be solved by using a **stochastic policy** $\\pi_\\theta(s,a)$ (probability of selection action $a$ in state $s$). One common representation is using the softmax function:\n", "\n", "$$\\pi_\\theta(s_t,a_t) = \\frac{e^{\\hat{Q}_\\theta(s_t, a_t)}}{\\sum_{a_{t+1}}e^{\\hat{Q}_\\theta(s_t, a_{t+1})}} $$\n", "\n", "Which gives a *continuous* function, which is readily differentiable. To improve the policy we define a **policy value** $\\rho(\\theta)$, that is the expected reward we get when following $\\pi_\\theta$. We then can perform gradient descent as usual. However this naive approach is inefficient due to the stochastic nature of the policy. One solution is the *REINFORCE* algorithm, to compute the **policy value gradient**:\n", "\n", "$$ \\nabla \\rho(\\theta) \\approx \\frac{1}{N} \\sum^N_{j=1} \\frac{\\nabla_\\theta \\pi_\\theta(s, a_j))R_j(s)}{\\pi_\\theta(s,a_j)}$$\n", "\n", "Where we have $N$ trials in all and the action taken on the $j$th trial is $a_j$. Policy gradients in general, are capable of learning a general approximation to the policy, however they suffer from high variance so they require allot of samples, which poses a challenge of sample efficiency in practice. However they often converge to some local minima which is good compared to Q-learning approximation, for which there are less guarantees (As we approximate the Bellman equation with a complicated function approximator).\n", "\n", "\n", "# Action spaces\n", "\n", "So far we have only discussed **discrete action spaces** (finite number of actions available for the agent to take), however we can also have **continous action spcaes** (such as controlling a robot in euclidean space). This affects the policy structure, we commonly have two variations:\n", "* **categorical policies** for **discrete action spaces**\n", "* **diagonal gaussian policies** for **continous action spaces**\n", "\n", "For diagonal gaussian policies, is a stochastic policy defined by a diagonal Gaussian distribution ($\\mathcal{N}(\\mu, \\Sigma)$) with covariance matrix only having entries on the diagonal (represented as a vector). An action can be sampled from this distribution (usually some spherical gaussian noise is added as well)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Summary\n", "\n", "In summary the **agent** formulation decides the kind of information that must be learned. The main three agent formulations are:\n", "\n", "* **Model-based** : Uses a world model $P$ and a value function $V$\n", "* **Model-free** (Value based) : Uses a action-value $Q$-value function\n", "* **Reflex design** (Policy based) : Uses a policy $\\pi$" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 2 }