{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Making Complex Decisions\n", "---\n", "\n", "This Jupyter notebook acts as supporting material for topics covered in **Chapter 17 Making Complex Decisions** of the book* Artificial Intelligence: A Modern Approach*. We make use of the implementations in mdp.py module. This notebook also includes a brief summary of the main topics as a review. Let us import everything from the mdp module to get started." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from mdp import *\n", "from notebook import psource, pseudocode, plot_pomdp_utility" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## CONTENTS\n", "\n", "* Overview\n", "* MDP\n", "* Grid MDP\n", "* Value Iteration\n", " * Value Iteration Visualization\n", "* Policy Iteration\n", "* POMDPs\n", "* POMDP Value Iteration\n", " - Value Iteration Visualization" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## OVERVIEW\n", "\n", "Before we start playing with the actual implementations let us review a couple of things about MDPs.\n", "\n", "- A stochastic process has the **Markov property** if the conditional probability distribution of future states of the process (conditional on both past and present states) depends only upon the present state, not on the sequence of events that preceded it.\n", "\n", " -- Source: [Wikipedia](https://en.wikipedia.org/wiki/Markov_property)\n", "\n", "Often it is possible to model many different phenomena as a Markov process by being flexible with our definition of state.\n", " \n", "\n", "- MDPs help us deal with fully-observable and non-deterministic/stochastic environments. For dealing with partially-observable and stochastic cases we make use of generalization of MDPs named POMDPs (partially observable Markov decision process).\n", "\n", "Our overall goal to solve a MDP is to come up with a policy which guides us to select the best action in each state so as to maximize the expected sum of future rewards." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## MDP\n", "\n", "To begin with let us look at the implementation of MDP class defined in mdp.py The docstring tells us what all is required to define a MDP namely - set of states, actions, initial state, transition model, and a reward function. Each of these are implemented as methods. Do not close the popup so that you can follow along the description of code below." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", "class MDP:\n",
"\n",
" """A Markov Decision Process, defined by an initial state, transition model,\n",
" and reward function. We also keep track of a gamma value, for use by\n",
" algorithms. The transition model is represented somewhat differently from\n",
" the text. Instead of P(s' | s, a) being a probability number for each\n",
" state/state/action triplet, we instead have T(s, a) return a\n",
" list of (p, s') pairs. We also keep track of the possible states,\n",
" terminal states, and actions for each state. [page 646]"""\n",
"\n",
" def __init__(self, init, actlist, terminals, transitions = {}, reward = None, states=None, gamma=.9):\n",
" if not (0 < gamma <= 1):\n",
" raise ValueError("An MDP must have 0 < gamma <= 1")\n",
"\n",
" if states:\n",
" self.states = states\n",
" else:\n",
" ## collect states from transitions table\n",
" self.states = self.get_states_from_transitions(transitions)\n",
" \n",
" \n",
" self.init = init\n",
" \n",
" if isinstance(actlist, list):\n",
" ## if actlist is a list, all states have the same actions\n",
" self.actlist = actlist\n",
" elif isinstance(actlist, dict):\n",
" ## if actlist is a dict, different actions for each state\n",
" self.actlist = actlist\n",
" \n",
" self.terminals = terminals\n",
" self.transitions = transitions\n",
" if self.transitions == {}:\n",
" print("Warning: Transition table is empty.")\n",
" self.gamma = gamma\n",
" if reward:\n",
" self.reward = reward\n",
" else:\n",
" self.reward = {s : 0 for s in self.states}\n",
" #self.check_consistency()\n",
"\n",
" def R(self, state):\n",
" """Return a numeric reward for this state."""\n",
" return self.reward[state]\n",
"\n",
" def T(self, state, action):\n",
" """Transition model. From a state and an action, return a list\n",
" of (probability, result-state) pairs."""\n",
" if(self.transitions == {}):\n",
" raise ValueError("Transition model is missing")\n",
" else:\n",
" return self.transitions[state][action]\n",
"\n",
" def actions(self, state):\n",
" """Set of actions that can be performed in this state. By default, a\n",
" fixed list of actions, except for terminal states. Override this\n",
" method if you need to specialize by state."""\n",
" if state in self.terminals:\n",
" return [None]\n",
" else:\n",
" return self.actlist\n",
"\n",
" def get_states_from_transitions(self, transitions):\n",
" if isinstance(transitions, dict):\n",
" s1 = set(transitions.keys())\n",
" s2 = set([tr[1] for actions in transitions.values() \n",
" for effects in actions.values() for tr in effects])\n",
" return s1.union(s2)\n",
" else:\n",
" print('Could not retrieve states from transitions')\n",
" return None\n",
"\n",
" def check_consistency(self):\n",
" # check that all states in transitions are valid\n",
" assert set(self.states) == self.get_states_from_transitions(self.transitions)\n",
" # check that init is a valid state\n",
" assert self.init in self.states\n",
" # check reward for each state\n",
" #assert set(self.reward.keys()) == set(self.states)\n",
" assert set(self.reward.keys()) == set(self.states)\n",
" # check that all terminals are valid states\n",
" assert all([t in self.states for t in self.terminals])\n",
" # check that probability distributions for all actions sum to 1\n",
" for s1, actions in self.transitions.items():\n",
" for a in actions.keys():\n",
" s = 0\n",
" for o in actions[a]:\n",
" s += o[0]\n",
" assert abs(s - 1) < 0.001\n",
"
class GridMDP(MDP):\n",
"\n",
" """A two-dimensional grid MDP, as in [Figure 17.1]. All you have to do is\n",
" specify the grid as a list of lists of rewards; use None for an obstacle\n",
" (unreachable state). Also, you should specify the terminal states.\n",
" An action is an (x, y) unit vector; e.g. (1, 0) means move east."""\n",
"\n",
" def __init__(self, grid, terminals, init=(0, 0), gamma=.9):\n",
" grid.reverse() # because we want row 0 on bottom, not on top\n",
" reward = {}\n",
" states = set()\n",
" self.rows = len(grid)\n",
" self.cols = len(grid[0])\n",
" self.grid = grid\n",
" for x in range(self.cols):\n",
" for y in range(self.rows):\n",
" if grid[y][x] is not None:\n",
" states.add((x, y))\n",
" reward[(x, y)] = grid[y][x]\n",
" self.states = states\n",
" actlist = orientations\n",
" transitions = {}\n",
" for s in states:\n",
" transitions[s] = {}\n",
" for a in actlist:\n",
" transitions[s][a] = self.calculate_T(s, a)\n",
" MDP.__init__(self, init, actlist=actlist,\n",
" terminals=terminals, transitions = transitions, \n",
" reward = reward, states = states, gamma=gamma)\n",
"\n",
" def calculate_T(self, state, action):\n",
" if action is None:\n",
" return [(0.0, state)]\n",
" else:\n",
" return [(0.8, self.go(state, action)),\n",
" (0.1, self.go(state, turn_right(action))),\n",
" (0.1, self.go(state, turn_left(action)))]\n",
" \n",
" def T(self, state, action):\n",
" if action is None:\n",
" return [(0.0, state)]\n",
" else:\n",
" return self.transitions[state][action]\n",
" \n",
" def go(self, state, direction):\n",
" """Return the state that results from going in this direction."""\n",
" state1 = vector_add(state, direction)\n",
" return state1 if state1 in self.states else state\n",
"\n",
" def to_grid(self, mapping):\n",
" """Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""\n",
" return list(reversed([[mapping.get((x, y), None)\n",
" for x in range(self.cols)]\n",
" for y in range(self.rows)]))\n",
"\n",
" def to_arrows(self, policy):\n",
" chars = {\n",
" (1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}\n",
" return self.to_grid({s: chars[a] for (s, a) in policy.items()})\n",
"
def value_iteration(mdp, epsilon=0.001):\n",
" """Solving an MDP by value iteration. [Figure 17.4]"""\n",
" U1 = {s: 0 for s in mdp.states}\n",
" R, T, gamma = mdp.R, mdp.T, mdp.gamma\n",
" while True:\n",
" U = U1.copy()\n",
" delta = 0\n",
" for s in mdp.states:\n",
" U1[s] = R(s) + gamma * max([sum([p * U[s1] for (p, s1) in T(s, a)])\n",
" for a in mdp.actions(s)])\n",
" delta = max(delta, abs(U1[s] - U[s]))\n",
" if delta < epsilon * (1 - gamma) / gamma:\n",
" return U\n",
"
def expected_utility(a, s, U, mdp):\n",
" """The expected utility of doing a in state s, according to the MDP and U."""\n",
" return sum([p * U[s1] for (p, s1) in mdp.T(s, a)])\n",
"
def policy_iteration(mdp):\n",
" """Solve an MDP by policy iteration [Figure 17.7]"""\n",
" U = {s: 0 for s in mdp.states}\n",
" pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}\n",
" while True:\n",
" U = policy_evaluation(pi, U, mdp)\n",
" unchanged = True\n",
" for s in mdp.states:\n",
" a = argmax(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))\n",
" if a != pi[s]:\n",
" pi[s] = a\n",
" unchanged = False\n",
" if unchanged:\n",
" return pi\n",
"
def policy_evaluation(pi, U, mdp, k=20):\n",
" """Return an updated utility mapping U from each state in the MDP to its\n",
" utility, using an approximation (modified policy iteration)."""\n",
" R, T, gamma = mdp.R, mdp.T, mdp.gamma\n",
" for i in range(k):\n",
" for s in mdp.states:\n",
" U[s] = R(s) + gamma * sum([p * U[s1] for (p, s1) in T(s, pi[s])])\n",
" return U\n",
"
def T(self, state, action):\n",
" if action is None:\n",
" return [(0.0, state)]\n",
" else:\n",
" return self.transitions[state][action]\n",
"
def to_arrows(self, policy):\n",
" chars = {\n",
" (1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}\n",
" return self.to_grid({s: chars[a] for (s, a) in policy.items()})\n",
"
def to_grid(self, mapping):\n",
" """Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""\n",
" return list(reversed([[mapping.get((x, y), None)\n",
" for x in range(self.cols)]\n",
" for y in range(self.rows)]))\n",
"
class POMDP(MDP):\n",
"\n",
" """A Partially Observable Markov Decision Process, defined by\n",
" a transition model P(s'|s,a), actions A(s), a reward function R(s),\n",
" and a sensor model P(e|s). We also keep track of a gamma value,\n",
" for use by algorithms. The transition and the sensor models\n",
" are defined as matrices. We also keep track of the possible states\n",
" and actions for each state. [page 659]."""\n",
"\n",
" def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95):\n",
" """Initialize variables of the pomdp"""\n",
"\n",
" if not (0 < gamma <= 1):\n",
" raise ValueError('A POMDP must have 0 < gamma <= 1')\n",
"\n",
" self.states = states\n",
" self.actions = actions\n",
"\n",
" # transition model cannot be undefined\n",
" self.t_prob = transitions or {}\n",
" if not self.t_prob:\n",
" print('Warning: Transition model is undefined')\n",
" \n",
" # sensor model cannot be undefined\n",
" self.e_prob = evidences or {}\n",
" if not self.e_prob:\n",
" print('Warning: Sensor model is undefined')\n",
" \n",
" self.gamma = gamma\n",
" self.rewards = rewards\n",
"\n",
" def remove_dominated_plans(self, input_values):\n",
" """\n",
" Remove dominated plans.\n",
" This method finds all the lines contributing to the\n",
" upper surface and removes those which don't.\n",
" """\n",
"\n",
" values = [val for action in input_values for val in input_values[action]]\n",
" values.sort(key=lambda x: x[0], reverse=True)\n",
"\n",
" best = [values[0]]\n",
" y1_max = max(val[1] for val in values)\n",
" tgt = values[0]\n",
" prev_b = 0\n",
" prev_ix = 0\n",
" while tgt[1] != y1_max:\n",
" min_b = 1\n",
" min_ix = 0\n",
" for i in range(prev_ix + 1, len(values)):\n",
" if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0:\n",
" trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1])\n",
" if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b:\n",
" min_b = trans_b\n",
" min_ix = i\n",
" prev_b = min_b\n",
" prev_ix = min_ix\n",
" tgt = values[min_ix]\n",
" best.append(tgt)\n",
"\n",
" return self.generate_mapping(best, input_values)\n",
"\n",
" def remove_dominated_plans_fast(self, input_values):\n",
" """\n",
" Remove dominated plans using approximations.\n",
" Resamples the upper boundary at intervals of 100 and\n",
" finds the maximum values at these points.\n",
" """\n",
"\n",
" values = [val for action in input_values for val in input_values[action]]\n",
" values.sort(key=lambda x: x[0], reverse=True)\n",
"\n",
" best = []\n",
" sr = 100\n",
" for i in range(sr + 1):\n",
" x = i / float(sr)\n",
" maximum = (values[0][1] - values[0][0]) * x + values[0][0]\n",
" tgt = values[0]\n",
" for value in values:\n",
" val = (value[1] - value[0]) * x + value[0]\n",
" if val > maximum:\n",
" maximum = val\n",
" tgt = value\n",
"\n",
" if all(any(tgt != v) for v in best):\n",
" best.append(tgt)\n",
"\n",
" return self.generate_mapping(best, input_values)\n",
"\n",
" def generate_mapping(self, best, input_values):\n",
" """Generate mappings after removing dominated plans"""\n",
"\n",
" mapping = defaultdict(list)\n",
" for value in best:\n",
" for action in input_values:\n",
" if any(all(value == v) for v in input_values[action]):\n",
" mapping[action].append(value)\n",
"\n",
" return mapping\n",
"\n",
" def max_difference(self, U1, U2):\n",
" """Find maximum difference between two utility mappings"""\n",
"\n",
" for k, v in U1.items():\n",
" sum1 = 0\n",
" for element in U1[k]:\n",
" sum1 += sum(element)\n",
" sum2 = 0\n",
" for element in U2[k]:\n",
" sum2 += sum(element)\n",
" return abs(sum1 - sum2)\n",
"
def pomdp_value_iteration(pomdp, epsilon=0.1):\n",
" """Solving a POMDP by value iteration."""\n",
"\n",
" U = {'':[[0]* len(pomdp.states)]}\n",
" count = 0\n",
" while True:\n",
" count += 1\n",
" prev_U = U\n",
" values = [val for action in U for val in U[action]]\n",
" value_matxs = []\n",
" for i in values:\n",
" for j in values:\n",
" value_matxs.append([i, j])\n",
"\n",
" U1 = defaultdict(list)\n",
" for action in pomdp.actions:\n",
" for u in value_matxs:\n",
" u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)], Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))), [[1], [1]])\n",
" u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]])\n",
" U1[action].append(u1[0])\n",
"\n",
" U = pomdp.remove_dominated_plans_fast(U1)\n",
" # replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations\n",
" \n",
" if count > 10:\n",
" if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma:\n",
" return U\n",
"