{ "cells": [ { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "# Solving problems by Searching\n", "\n", "This notebook serves as supporting material for topics covered in **Chapter 3 - Solving Problems by Searching** and **Chapter 4 - Beyond Classical Search** from the book *Artificial Intelligence: A Modern Approach.* This notebook uses implementations from [search.py](https://github.com/aimacode/aima-python/blob/master/search.py) module. Let's start by importing everything from search module." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from search import *\n", "from notebook import psource, heatmap, gaussian_kernel, show_map, final_path_colors, display_visual, plot_NQueens\n", "\n", "# Needed to hide warnings in the matplotlib sections\n", "import warnings\n", "warnings.filterwarnings(\"ignore\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## CONTENTS\n", "\n", "* Overview\n", "* Problem\n", "* Node\n", "* Simple Problem Solving Agent\n", "* Search Algorithms Visualization\n", "* Breadth-First Tree Search\n", "* Breadth-First Search\n", "* Best First Search\n", "* Uniform Cost Search\n", "* Greedy Best First Search\n", "* A\\* Search\n", "* Hill Climbing\n", "* Simulated Annealing\n", "* Genetic Algorithm\n", "* AND-OR Graph Search\n", "* Online DFS Agent\n", "* LRTA* Agent" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## OVERVIEW\n", "\n", "Here, we learn about a specific kind of problem solving - building goal-based agents that can plan ahead to solve problems. In particular, we examine navigation problem/route finding problem. We must begin by precisely defining **problems** and their **solutions**. We will look at several general-purpose search algorithms.\n", "\n", "Search algorithms can be classified into two types:\n", "\n", "* **Uninformed search algorithms**: Search algorithms which explore the search space without having any information about the problem other than its definition.\n", " * Examples:\n", " 1. Breadth First Search\n", " 2. Depth First Search\n", " 3. Depth Limited Search\n", " 4. Iterative Deepening Search\n", "\n", "\n", "* **Informed search algorithms**: These type of algorithms leverage any information (heuristics, path cost) on the problem to search through the search space to find the solution efficiently.\n", " * Examples:\n", " 1. Best First Search\n", " 2. Uniform Cost Search\n", " 3. A\\* Search\n", " 4. Recursive Best First Search\n", "\n", "*Don't miss the visualisations of these algorithms solving the route-finding problem defined on Romania map at the end of this notebook.*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For visualisations, we use networkx and matplotlib to show the map in the notebook and we use ipywidgets to interact with the map to see how the searching algorithm works. These are imported as required in `notebook.py`." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "import networkx as nx\n", "import matplotlib.pyplot as plt\n", "from matplotlib import lines\n", "\n", "from ipywidgets import interact\n", "import ipywidgets as widgets\n", "from IPython.display import display\n", "import time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PROBLEM\n", "\n", "Let's see how we define a Problem. Run the next cell to see how abstract class `Problem` is defined in the search module." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", "class Problem(object):\n",
"\n",
" """The abstract class for a formal problem. You should subclass\n",
" this and implement the methods actions and result, and possibly\n",
" __init__, goal_test, and path_cost. Then you will create instances\n",
" of your subclass and solve them with the various search functions."""\n",
"\n",
" def __init__(self, initial, goal=None):\n",
" """The constructor specifies the initial state, and possibly a goal\n",
" state, if there is a unique goal. Your subclass's constructor can add\n",
" other arguments."""\n",
" self.initial = initial\n",
" self.goal = goal\n",
"\n",
" def actions(self, state):\n",
" """Return the actions that can be executed in the given\n",
" state. The result would typically be a list, but if there are\n",
" many actions, consider yielding them one at a time in an\n",
" iterator, rather than building them all at once."""\n",
" raise NotImplementedError\n",
"\n",
" def result(self, state, action):\n",
" """Return the state that results from executing the given\n",
" action in the given state. The action must be one of\n",
" self.actions(state)."""\n",
" raise NotImplementedError\n",
"\n",
" def goal_test(self, state):\n",
" """Return True if the state is a goal. The default method compares the\n",
" state to self.goal or checks for state in self.goal if it is a\n",
" list, as specified in the constructor. Override this method if\n",
" checking against a single self.goal is not enough."""\n",
" if isinstance(self.goal, list):\n",
" return is_in(state, self.goal)\n",
" else:\n",
" return state == self.goal\n",
"\n",
" def path_cost(self, c, state1, action, state2):\n",
" """Return the cost of a solution path that arrives at state2 from\n",
" state1 via action, assuming cost c to get up to state1. If the problem\n",
" is such that the path doesn't matter, this function will only look at\n",
" state2. If the path does matter, it will consider c and maybe state1\n",
" and action. The default method costs 1 for every step in the path."""\n",
" return c + 1\n",
"\n",
" def value(self, state):\n",
" """For optimization problems, each state has a value. Hill-climbing\n",
" and related algorithms try to maximize this value."""\n",
" raise NotImplementedError\n",
"
class Node:\n",
"\n",
" """A node in a search tree. Contains a pointer to the parent (the node\n",
" that this is a successor of) and to the actual state for this node. Note\n",
" that if a state is arrived at by two paths, then there are two nodes with\n",
" the same state. Also includes the action that got us to this state, and\n",
" the total path_cost (also known as g) to reach the node. Other functions\n",
" may add an f and h value; see best_first_graph_search and astar_search for\n",
" an explanation of how the f and h values are handled. You will not need to\n",
" subclass this class."""\n",
"\n",
" def __init__(self, state, parent=None, action=None, path_cost=0):\n",
" """Create a search tree Node, derived from a parent by an action."""\n",
" self.state = state\n",
" self.parent = parent\n",
" self.action = action\n",
" self.path_cost = path_cost\n",
" self.depth = 0\n",
" if parent:\n",
" self.depth = parent.depth + 1\n",
"\n",
" def __repr__(self):\n",
" return "<Node {}>".format(self.state)\n",
"\n",
" def __lt__(self, node):\n",
" return self.state < node.state\n",
"\n",
" def expand(self, problem):\n",
" """List the nodes reachable in one step from this node."""\n",
" return [self.child_node(problem, action)\n",
" for action in problem.actions(self.state)]\n",
"\n",
" def child_node(self, problem, action):\n",
" """[Figure 3.10]"""\n",
" next_state = problem.result(self.state, action)\n",
" next_node = Node(next_state, self, action,\n",
" problem.path_cost(self.path_cost, self.state,\n",
" action, next_state))\n",
" return next_node\n",
" \n",
" def solution(self):\n",
" """Return the sequence of actions to go from the root to this node."""\n",
" return [node.action for node in self.path()[1:]]\n",
"\n",
" def path(self):\n",
" """Return a list of nodes forming the path from the root to this node."""\n",
" node, path_back = self, []\n",
" while node:\n",
" path_back.append(node)\n",
" node = node.parent\n",
" return list(reversed(path_back))\n",
"\n",
" # We want for a queue of nodes in breadth_first_graph_search or\n",
" # astar_search to have no duplicated states, so we treat nodes\n",
" # with the same state as equal. [Problem: this may not be what you\n",
" # want in other contexts.]\n",
"\n",
" def __eq__(self, other):\n",
" return isinstance(other, Node) and self.state == other.state\n",
"\n",
" def __hash__(self):\n",
" return hash(self.state)\n",
"
class GraphProblem(Problem):\n",
"\n",
" """The problem of searching a graph from one node to another."""\n",
"\n",
" def __init__(self, initial, goal, graph):\n",
" Problem.__init__(self, initial, goal)\n",
" self.graph = graph\n",
"\n",
" def actions(self, A):\n",
" """The actions at a graph node are just its neighbors."""\n",
" return list(self.graph.get(A).keys())\n",
"\n",
" def result(self, state, action):\n",
" """The result of going to a neighbor is just that neighbor."""\n",
" return action\n",
"\n",
" def path_cost(self, cost_so_far, A, action, B):\n",
" return cost_so_far + (self.graph.get(A, B) or infinity)\n",
"\n",
" def find_min_edge(self):\n",
" """Find minimum value of edges."""\n",
" m = infinity\n",
" for d in self.graph.graph_dict.values():\n",
" local_min = min(d.values())\n",
" m = min(m, local_min)\n",
"\n",
" return m\n",
"\n",
" def h(self, node):\n",
" """h function is straight-line distance from a node's state to goal."""\n",
" locs = getattr(self.graph, 'locations', None)\n",
" if locs:\n",
" if type(node) is str:\n",
" return int(distance(locs[node], locs[self.goal]))\n",
"\n",
" return int(distance(locs[node.state], locs[self.goal]))\n",
" else:\n",
" return infinity\n",
"
class SimpleProblemSolvingAgentProgram:\n",
"\n",
" """Abstract framework for a problem-solving agent. [Figure 3.1]"""\n",
"\n",
" def __init__(self, initial_state=None):\n",
" """State is an abstract representation of the state\n",
" of the world, and seq is the list of actions required\n",
" to get to a particular state from the initial state(root)."""\n",
" self.state = initial_state\n",
" self.seq = []\n",
"\n",
" def __call__(self, percept):\n",
" """[Figure 3.1] Formulate a goal and problem, then\n",
" search for a sequence of actions to solve it."""\n",
" self.state = self.update_state(self.state, percept)\n",
" if not self.seq:\n",
" goal = self.formulate_goal(self.state)\n",
" problem = self.formulate_problem(self.state, goal)\n",
" self.seq = self.search(problem)\n",
" if not self.seq:\n",
" return None\n",
" return self.seq.pop(0)\n",
"\n",
" def update_state(self, state, percept):\n",
" raise NotImplementedError\n",
"\n",
" def formulate_goal(self, state):\n",
" raise NotImplementedError\n",
"\n",
" def formulate_problem(self, state, goal):\n",
" raise NotImplementedError\n",
"\n",
" def search(self, problem):\n",
" raise NotImplementedError\n",
"
def recursive_best_first_search(problem, h=None):\n",
" """[Figure 3.26] Recursive best-first search (RBFS) is an\n",
" informative search algorithm. Like A*, it uses the heuristic\n",
" f(n) = g(n) + h(n) to determine the next node to expand, making\n",
" it both optimal and complete (iff the heuristic is consistent).\n",
" To reduce memory consumption, RBFS uses a depth first search\n",
" and only retains the best f values of its ancestors."""\n",
" h = memoize(h or problem.h, 'h')\n",
"\n",
" def RBFS(problem, node, flimit):\n",
" if problem.goal_test(node.state):\n",
" return node, 0 # (The second value is immaterial)\n",
" successors = node.expand(problem)\n",
" if len(successors) == 0:\n",
" return None, infinity\n",
" for s in successors:\n",
" s.f = max(s.path_cost + h(s), node.f)\n",
" while True:\n",
" # Order by lowest f value\n",
" successors.sort(key=lambda x: x.f)\n",
" best = successors[0]\n",
" if best.f > flimit:\n",
" return None, best.f\n",
" if len(successors) > 1:\n",
" alternative = successors[1].f\n",
" else:\n",
" alternative = infinity\n",
" result, best.f = RBFS(problem, best, min(flimit, alternative))\n",
" if result is not None:\n",
" return result, best.f\n",
"\n",
" node = Node(problem.initial)\n",
" node.f = h(node)\n",
" result, bestf = RBFS(problem, node, infinity)\n",
" return result\n",
"
def hill_climbing(problem):\n",
" """From the initial node, keep choosing the neighbor with highest value,\n",
" stopping when no neighbor is better. [Figure 4.2]"""\n",
" current = Node(problem.initial)\n",
" while True:\n",
" neighbors = current.expand(problem)\n",
" if not neighbors:\n",
" break\n",
" neighbor = argmax_random_tie(neighbors,\n",
" key=lambda node: problem.value(node.state))\n",
" if problem.value(neighbor.state) <= problem.value(current.state):\n",
" break\n",
" current = neighbor\n",
" return current.state\n",
"
def simulated_annealing(problem, schedule=exp_schedule()):\n",
" """[Figure 4.5] CAUTION: This differs from the pseudocode as it\n",
" returns a state instead of a Node."""\n",
" current = Node(problem.initial)\n",
" for t in range(sys.maxsize):\n",
" T = schedule(t)\n",
" if T == 0:\n",
" return current.state\n",
" neighbors = current.expand(problem)\n",
" if not neighbors:\n",
" return current.state\n",
" next_choice = random.choice(neighbors)\n",
" delta_e = problem.value(next_choice.state) - problem.value(current.state)\n",
" if delta_e > 0 or probability(math.exp(delta_e / T)):\n",
" current = next_choice\n",
"
def exp_schedule(k=20, lam=0.005, limit=100):\n",
" """One possible schedule function for simulated annealing"""\n",
" return lambda t: (k * math.exp(-lam * t) if t < limit else 0)\n",
"
def genetic_algorithm(population, fitness_fn, gene_pool=[0, 1], f_thres=None, ngen=1000, pmut=0.1):\n",
" """[Figure 4.8]"""\n",
" for i in range(ngen):\n",
" population = [mutate(recombine(*select(2, population, fitness_fn)), gene_pool, pmut)\n",
" for i in range(len(population))]\n",
"\n",
" fittest_individual = fitness_threshold(fitness_fn, f_thres, population)\n",
" if fittest_individual:\n",
" return fittest_individual\n",
"\n",
"\n",
" return argmax(population, key=fitness_fn)\n",
"
def recombine(x, y):\n",
" n = len(x)\n",
" c = random.randrange(0, n)\n",
" return x[:c] + y[c:]\n",
"
def mutate(x, gene_pool, pmut):\n",
" if random.uniform(0, 1) >= pmut:\n",
" return x\n",
"\n",
" n = len(x)\n",
" g = len(gene_pool)\n",
" c = random.randrange(0, n)\n",
" r = random.randrange(0, g)\n",
"\n",
" new_gene = gene_pool[r]\n",
" return x[:c] + [new_gene] + x[c+1:]\n",
"
def init_population(pop_number, gene_pool, state_length):\n",
" """Initializes population for genetic algorithm\n",
" pop_number : Number of individuals in population\n",
" gene_pool : List of possible values for individuals\n",
" state_length: The length of each individual"""\n",
" g = len(gene_pool)\n",
" population = []\n",
" for i in range(pop_number):\n",
" new_individual = [gene_pool[random.randrange(0, g)] for j in range(state_length)]\n",
" population.append(new_individual)\n",
"\n",
" return population\n",
"
def genetic_algorithm(population, fitness_fn, gene_pool=[0, 1], f_thres=None, ngen=1000, pmut=0.1):\n",
" """[Figure 4.8]"""\n",
" for i in range(ngen):\n",
" population = [mutate(recombine(*select(2, population, fitness_fn)), gene_pool, pmut)\n",
" for i in range(len(population))]\n",
"\n",
" fittest_individual = fitness_threshold(fitness_fn, f_thres, population)\n",
" if fittest_individual:\n",
" return fittest_individual\n",
"\n",
"\n",
" return argmax(population, key=fitness_fn)\n",
"
class NQueensProblem(Problem):\n",
"\n",
" """The problem of placing N queens on an NxN board with none attacking\n",
" each other. A state is represented as an N-element array, where\n",
" a value of r in the c-th entry means there is a queen at column c,\n",
" row r, and a value of -1 means that the c-th column has not been\n",
" filled in yet. We fill in columns left to right.\n",
" >>> depth_first_tree_search(NQueensProblem(8))\n",
" <Node (7, 3, 0, 2, 5, 1, 6, 4)>\n",
" """\n",
"\n",
" def __init__(self, N):\n",
" self.N = N\n",
" self.initial = tuple([-1] * N)\n",
" Problem.__init__(self, self.initial)\n",
"\n",
" def actions(self, state):\n",
" """In the leftmost empty column, try all non-conflicting rows."""\n",
" if state[-1] is not -1:\n",
" return [] # All columns filled; no successors\n",
" else:\n",
" col = state.index(-1)\n",
" return [row for row in range(self.N)\n",
" if not self.conflicted(state, row, col)]\n",
"\n",
" def result(self, state, row):\n",
" """Place the next queen at the given row."""\n",
" col = state.index(-1)\n",
" new = list(state[:])\n",
" new[col] = row\n",
" return tuple(new)\n",
"\n",
" def conflicted(self, state, row, col):\n",
" """Would placing a queen at (row, col) conflict with anything?"""\n",
" return any(self.conflict(row, col, state[c], c)\n",
" for c in range(col))\n",
"\n",
" def conflict(self, row1, col1, row2, col2):\n",
" """Would putting two queens in (row1, col1) and (row2, col2) conflict?"""\n",
" return (row1 == row2 or # same row\n",
" col1 == col2 or # same column\n",
" row1 - col1 == row2 - col2 or # same \\ diagonal\n",
" row1 + col1 == row2 + col2) # same / diagonal\n",
"\n",
" def goal_test(self, state):\n",
" """Check if all columns filled, no conflicts."""\n",
" if state[-1] is -1:\n",
" return False\n",
" return not any(self.conflicted(state, state[col], col)\n",
" for col in range(len(state)))\n",
"\n",
" def h(self, node):\n",
" """Return number of conflicting queens for a given node"""\n",
" num_conflicts = 0\n",
" for (r1, c1) in enumerate(node.state):\n",
" for (r2, c2) in enumerate(node.state):\n",
" if (r1, c1) != (r2, c2):\n",
" num_conflicts += self.conflict(r1, c1, r2, c2)\n",
"\n",
" return num_conflicts\n",
"
def h(self, node):\n",
" """Return number of conflicting queens for a given node"""\n",
" num_conflicts = 0\n",
" for (r1, c1) in enumerate(node.state):\n",
" for (r2, c2) in enumerate(node.state):\n",
" if (r1, c1) != (r2, c2):\n",
" num_conflicts += self.conflict(r1, c1, r2, c2)\n",
"\n",
" return num_conflicts\n",
"
def and_or_graph_search(problem):\n",
" """[Figure 4.11]Used when the environment is nondeterministic and completely observable.\n",
" Contains OR nodes where the agent is free to choose any action.\n",
" After every action there is an AND node which contains all possible states\n",
" the agent may reach due to stochastic nature of environment.\n",
" The agent must be able to handle all possible states of the AND node (as it\n",
" may end up in any of them).\n",
" Returns a conditional plan to reach goal state,\n",
" or failure if the former is not possible."""\n",
"\n",
" # functions used by and_or_search\n",
" def or_search(state, problem, path):\n",
" """returns a plan as a list of actions"""\n",
" if problem.goal_test(state):\n",
" return []\n",
" if state in path:\n",
" return None\n",
" for action in problem.actions(state):\n",
" plan = and_search(problem.result(state, action),\n",
" problem, path + [state, ])\n",
" if plan is not None:\n",
" return [action, plan]\n",
"\n",
" def and_search(states, problem, path):\n",
" """Returns plan in form of dictionary where we take action plan[s] if we reach state s."""\n",
" plan = {}\n",
" for s in states:\n",
" plan[s] = or_search(s, problem, path)\n",
" if plan[s] is None:\n",
" return None\n",
" return plan\n",
"\n",
" # body of and or search\n",
" return or_search(problem.initial, problem, [])\n",
"
class OnlineDFSAgent:\n",
"\n",
" """[Figure 4.21] The abstract class for an OnlineDFSAgent. Override\n",
" update_state method to convert percept to state. While initializing\n",
" the subclass a problem needs to be provided which is an instance of\n",
" a subclass of the Problem class."""\n",
"\n",
" def __init__(self, problem):\n",
" self.problem = problem\n",
" self.s = None\n",
" self.a = None\n",
" self.untried = dict()\n",
" self.unbacktracked = dict()\n",
" self.result = {}\n",
"\n",
" def __call__(self, percept):\n",
" s1 = self.update_state(percept)\n",
" if self.problem.goal_test(s1):\n",
" self.a = None\n",
" else:\n",
" if s1 not in self.untried.keys():\n",
" self.untried[s1] = self.problem.actions(s1)\n",
" if self.s is not None:\n",
" if s1 != self.result[(self.s, self.a)]:\n",
" self.result[(self.s, self.a)] = s1\n",
" self.unbacktracked[s1].insert(0, self.s)\n",
" if len(self.untried[s1]) == 0:\n",
" if len(self.unbacktracked[s1]) == 0:\n",
" self.a = None\n",
" else:\n",
" # else a <- an action b such that result[s', b] = POP(unbacktracked[s'])\n",
" unbacktracked_pop = self.unbacktracked.pop(s1)\n",
" for (s, b) in self.result.keys():\n",
" if self.result[(s, b)] == unbacktracked_pop:\n",
" self.a = b\n",
" break\n",
" else:\n",
" self.a = self.untried.pop(s1)\n",
" self.s = s1\n",
" return self.a\n",
"\n",
" def update_state(self, percept):\n",
" """To be overridden in most cases. The default case\n",
" assumes the percept to be of type state."""\n",
" return percept\n",
"
class LRTAStarAgent:\n",
"\n",
" """ [Figure 4.24]\n",
" Abstract class for LRTA*-Agent. A problem needs to be\n",
" provided which is an instance of a subclass of Problem Class.\n",
"\n",
" Takes a OnlineSearchProblem [Figure 4.23] as a problem.\n",
" """\n",
"\n",
" def __init__(self, problem):\n",
" self.problem = problem\n",
" # self.result = {} # no need as we are using problem.result\n",
" self.H = {}\n",
" self.s = None\n",
" self.a = None\n",
"\n",
" def __call__(self, s1): # as of now s1 is a state rather than a percept\n",
" if self.problem.goal_test(s1):\n",
" self.a = None\n",
" return self.a\n",
" else:\n",
" if s1 not in self.H:\n",
" self.H[s1] = self.problem.h(s1)\n",
" if self.s is not None:\n",
" # self.result[(self.s, self.a)] = s1 # no need as we are using problem.output\n",
"\n",
" # minimum cost for action b in problem.actions(s)\n",
" self.H[self.s] = min(self.LRTA_cost(self.s, b, self.problem.output(self.s, b),\n",
" self.H) for b in self.problem.actions(self.s))\n",
"\n",
" # an action b in problem.actions(s1) that minimizes costs\n",
" self.a = argmin(self.problem.actions(s1),\n",
" key=lambda b: self.LRTA_cost(s1, b, self.problem.output(s1, b), self.H))\n",
"\n",
" self.s = s1\n",
" return self.a\n",
"\n",
" def LRTA_cost(self, s, a, s1, H):\n",
" """Returns cost to move from state 's' to state 's1' plus\n",
" estimated cost to get to goal from s1."""\n",
" print(s, a, s1)\n",
" if s1 is None:\n",
" return self.problem.h(s)\n",
" else:\n",
" # sometimes we need to get H[s1] which we haven't yet added to H\n",
" # to replace this try, except: we can initialize H with values from problem.h\n",
" try:\n",
" return self.problem.c(s, a, s1) + self.H[s1]\n",
" except:\n",
" return self.problem.c(s, a, s1) + self.problem.h(s1)\n",
"