{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"# Planning\n",
"#### Chapters 10-11\n",
"----"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook serves as supporting material for topics covered in **Chapter 10 - Classical Planning** and **Chapter 11 - Planning and Acting in the Real World** from the book *[Artificial Intelligence: A Modern Approach](http://aima.cs.berkeley.edu)*. \n",
"This notebook uses implementations from the [planning.py](https://github.com/aimacode/aima-python/blob/master/planning.py) module. \n",
"See the [intro notebook](https://github.com/aimacode/aima-python/blob/master/intro.ipynb) for instructions.\n",
"\n",
"We'll start by looking at `PlanningProblem` and `Action` data types for defining problems and actions. \n",
"Then, we will see how to use them by trying to plan a trip from *Sibiu* to *Bucharest* across the familiar map of Romania, from [search.ipynb](https://github.com/aimacode/aima-python/blob/master/search.ipynb) \n",
"followed by some common planning problems and methods of solving them.\n",
"\n",
"Let's start by importing everything from the planning module."
]
},
{
"cell_type": "code",
"execution_count": 79,
"metadata": {},
"outputs": [],
"source": [
"from planning import *\n",
"from notebook import psource"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## CONTENTS\n",
"\n",
"**Classical Planning**\n",
"- PlanningProblem\n",
"- Action\n",
"- Planning Problems\n",
" * Air cargo problem\n",
" * Spare tire problem\n",
" * Three block tower problem\n",
" * Shopping Problem\n",
" * Socks and shoes problem\n",
" * Cake problem\n",
"- Solving Planning Problems\n",
" * GraphPlan\n",
" * Linearize\n",
" * PartialOrderPlanner\n",
"
\n",
"\n",
"**Planning in the real world**\n",
"- Problem\n",
"- HLA\n",
"- Planning Problems\n",
" * Job shop problem\n",
" * Double tennis problem\n",
"- Solving Planning Problems\n",
" * Hierarchical Search\n",
" * Angelic Search"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## PlanningProblem\n",
"\n",
"PDDL stands for Planning Domain Definition Language.\n",
"The `PlanningProblem` class is used to represent planning problems in this module. The following attributes are essential to be able to define a problem:\n",
"* an initial state\n",
"* a set of goals\n",
"* a set of viable actions that can be executed in the search space of the problem\n",
"\n",
"View the source to see how the Python code tries to realise these."
]
},
{
"cell_type": "code",
"execution_count": 80,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"\n",
"
class PlanningProblem:\n",
" """\n",
" Planning Domain Definition Language (PlanningProblem) used to define a search problem.\n",
" It stores states in a knowledge base consisting of first order logic statements.\n",
" The conjunction of these logical statements completely defines a state.\n",
" """\n",
"\n",
" def __init__(self, init, goals, actions):\n",
" self.init = self.convert(init)\n",
" self.goals = self.convert(goals)\n",
" self.actions = actions\n",
"\n",
" def convert(self, clauses):\n",
" """Converts strings into exprs"""\n",
" if not isinstance(clauses, Expr):\n",
" if len(clauses) > 0:\n",
" clauses = expr(clauses)\n",
" else:\n",
" clauses = []\n",
" try:\n",
" clauses = conjuncts(clauses)\n",
" except AttributeError:\n",
" clauses = clauses\n",
"\n",
" new_clauses = []\n",
" for clause in clauses:\n",
" if clause.op == '~':\n",
" new_clauses.append(expr('Not' + str(clause.args[0])))\n",
" else:\n",
" new_clauses.append(clause)\n",
" return new_clauses\n",
"\n",
" def goal_test(self):\n",
" """Checks if the goals have been reached"""\n",
" return all(goal in self.init for goal in self.goals)\n",
"\n",
" def act(self, action):\n",
" """\n",
" Performs the action given as argument.\n",
" Note that action is an Expr like expr('Remove(Glass, Table)') or expr('Eat(Sandwich)')\n",
" """ \n",
" action_name = action.op\n",
" args = action.args\n",
" list_action = first(a for a in self.actions if a.name == action_name)\n",
" if list_action is None:\n",
" raise Exception("Action '{}' not found".format(action_name))\n",
" if not list_action.check_precond(self.init, args):\n",
" raise Exception("Action '{}' pre-conditions not satisfied".format(action))\n",
" self.init = list_action(self.init, args).clauses\n",
"
class Action:\n",
" """\n",
" Defines an action schema using preconditions and effects.\n",
" Use this to describe actions in PlanningProblem.\n",
" action is an Expr where variables are given as arguments(args).\n",
" Precondition and effect are both lists with positive and negative literals.\n",
" Negative preconditions and effects are defined by adding a 'Not' before the name of the clause\n",
" Example:\n",
" precond = [expr("Human(person)"), expr("Hungry(Person)"), expr("NotEaten(food)")]\n",
" effect = [expr("Eaten(food)"), expr("Hungry(person)")]\n",
" eat = Action(expr("Eat(person, food)"), precond, effect)\n",
" """\n",
"\n",
" def __init__(self, action, precond, effect):\n",
" if isinstance(action, str):\n",
" action = expr(action)\n",
" self.name = action.op\n",
" self.args = action.args\n",
" self.precond = self.convert(precond)\n",
" self.effect = self.convert(effect)\n",
"\n",
" def __call__(self, kb, args):\n",
" return self.act(kb, args)\n",
"\n",
" def __repr__(self):\n",
" return '{}({})'.format(self.__class__.__name__, Expr(self.name, *self.args))\n",
"\n",
" def convert(self, clauses):\n",
" """Converts strings into Exprs"""\n",
" if isinstance(clauses, Expr):\n",
" clauses = conjuncts(clauses)\n",
" for i in range(len(clauses)):\n",
" if clauses[i].op == '~':\n",
" clauses[i] = expr('Not' + str(clauses[i].args[0]))\n",
"\n",
" elif isinstance(clauses, str):\n",
" clauses = clauses.replace('~', 'Not')\n",
" if len(clauses) > 0:\n",
" clauses = expr(clauses)\n",
"\n",
" try:\n",
" clauses = conjuncts(clauses)\n",
" except AttributeError:\n",
" pass\n",
"\n",
" return clauses\n",
"\n",
" def substitute(self, e, args):\n",
" """Replaces variables in expression with their respective Propositional symbol"""\n",
"\n",
" new_args = list(e.args)\n",
" for num, x in enumerate(e.args):\n",
" for i, _ in enumerate(self.args):\n",
" if self.args[i] == x:\n",
" new_args[num] = args[i]\n",
" return Expr(e.op, *new_args)\n",
"\n",
" def check_precond(self, kb, args):\n",
" """Checks if the precondition is satisfied in the current state"""\n",
"\n",
" if isinstance(kb, list):\n",
" kb = FolKB(kb)\n",
" for clause in self.precond:\n",
" if self.substitute(clause, args) not in kb.clauses:\n",
" return False\n",
" return True\n",
"\n",
" def act(self, kb, args):\n",
" """Executes the action on the state's knowledge base"""\n",
"\n",
" if isinstance(kb, list):\n",
" kb = FolKB(kb)\n",
"\n",
" if not self.check_precond(kb, args):\n",
" raise Exception('Action pre-conditions not satisfied')\n",
" for clause in self.effect:\n",
" kb.tell(self.substitute(clause, args))\n",
" if clause.op[:3] == 'Not':\n",
" new_clause = Expr(clause.op[3:], *clause.args)\n",
"\n",
" if kb.ask(self.substitute(new_clause, args)) is not False:\n",
" kb.retract(self.substitute(new_clause, args))\n",
" else:\n",
" new_clause = Expr('Not' + clause.op, *clause.args)\n",
"\n",
" if kb.ask(self.substitute(new_clause, args)) is not False: \n",
" kb.retract(self.substitute(new_clause, args))\n",
"\n",
" return kb\n",
"
def air_cargo():\n",
" """\n",
" [Figure 10.1] AIR-CARGO-PROBLEM\n",
"\n",
" An air-cargo shipment problem for delivering cargo to different locations,\n",
" given the starting location and airplanes.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> ac = air_cargo()\n",
" >>> ac.goal_test()\n",
" False\n",
" >>> ac.act(expr('Load(C2, P2, JFK)'))\n",
" >>> ac.act(expr('Load(C1, P1, SFO)'))\n",
" >>> ac.act(expr('Fly(P1, SFO, JFK)'))\n",
" >>> ac.act(expr('Fly(P2, JFK, SFO)'))\n",
" >>> ac.act(expr('Unload(C2, P2, SFO)'))\n",
" >>> ac.goal_test()\n",
" False\n",
" >>> ac.act(expr('Unload(C1, P1, JFK)'))\n",
" >>> ac.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='At(C1, SFO) & At(C2, JFK) & At(P1, SFO) & At(P2, JFK) & Cargo(C1) & Cargo(C2) & Plane(P1) & Plane(P2) & Airport(SFO) & Airport(JFK)', \n",
" goals='At(C1, JFK) & At(C2, SFO)',\n",
" actions=[Action('Load(c, p, a)', \n",
" precond='At(c, a) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)',\n",
" effect='In(c, p) & ~At(c, a)'),\n",
" Action('Unload(c, p, a)',\n",
" precond='In(c, p) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)',\n",
" effect='At(c, a) & ~In(c, p)'),\n",
" Action('Fly(p, f, to)',\n",
" precond='At(p, f) & Plane(p) & Airport(f) & Airport(to)',\n",
" effect='At(p, to) & ~At(p, f)')])\n",
"
def spare_tire():\n",
" """[Figure 10.2] SPARE-TIRE-PROBLEM\n",
"\n",
" A problem involving changing the flat tire of a car\n",
" with a spare tire from the trunk.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> st = spare_tire()\n",
" >>> st.goal_test()\n",
" False\n",
" >>> st.act(expr('Remove(Spare, Trunk)'))\n",
" >>> st.act(expr('Remove(Flat, Axle)'))\n",
" >>> st.goal_test()\n",
" False\n",
" >>> st.act(expr('PutOn(Spare, Axle)'))\n",
" >>> st.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='Tire(Flat) & Tire(Spare) & At(Flat, Axle) & At(Spare, Trunk)',\n",
" goals='At(Spare, Axle) & At(Flat, Ground)',\n",
" actions=[Action('Remove(obj, loc)',\n",
" precond='At(obj, loc)',\n",
" effect='At(obj, Ground) & ~At(obj, loc)'),\n",
" Action('PutOn(t, Axle)',\n",
" precond='Tire(t) & At(t, Ground) & ~At(Flat, Axle)',\n",
" effect='At(t, Axle) & ~At(t, Ground)'),\n",
" Action('LeaveOvernight',\n",
" precond='',\n",
" effect='~At(Spare, Ground) & ~At(Spare, Axle) & ~At(Spare, Trunk) & \\\n",
" ~At(Flat, Ground) & ~At(Flat, Axle) & ~At(Flat, Trunk)')])\n",
"
def three_block_tower():\n",
" """\n",
" [Figure 10.3] THREE-BLOCK-TOWER\n",
"\n",
" A blocks-world problem of stacking three blocks in a certain configuration,\n",
" also known as the Sussman Anomaly.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> tbt = three_block_tower()\n",
" >>> tbt.goal_test()\n",
" False\n",
" >>> tbt.act(expr('MoveToTable(C, A)'))\n",
" >>> tbt.act(expr('Move(B, Table, C)'))\n",
" >>> tbt.goal_test()\n",
" False\n",
" >>> tbt.act(expr('Move(A, Table, B)'))\n",
" >>> tbt.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='On(A, Table) & On(B, Table) & On(C, A) & Block(A) & Block(B) & Block(C) & Clear(B) & Clear(C)',\n",
" goals='On(A, B) & On(B, C)',\n",
" actions=[Action('Move(b, x, y)',\n",
" precond='On(b, x) & Clear(b) & Clear(y) & Block(b) & Block(y)',\n",
" effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)'),\n",
" Action('MoveToTable(b, x)',\n",
" precond='On(b, x) & Clear(b) & Block(b)',\n",
" effect='On(b, Table) & Clear(x) & ~On(b, x)')])\n",
"
def simple_blocks_world():\n",
" """\n",
" SIMPLE-BLOCKS-WORLD\n",
"\n",
" A simplified definition of the Sussman Anomaly problem.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> sbw = simple_blocks_world()\n",
" >>> sbw.goal_test()\n",
" False\n",
" >>> sbw.act(expr('ToTable(A, B)'))\n",
" >>> sbw.act(expr('FromTable(B, A)'))\n",
" >>> sbw.goal_test()\n",
" False\n",
" >>> sbw.act(expr('FromTable(C, B)'))\n",
" >>> sbw.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='On(A, B) & Clear(A) & OnTable(B) & OnTable(C) & Clear(C)',\n",
" goals='On(B, A) & On(C, B)',\n",
" actions=[Action('ToTable(x, y)',\n",
" precond='On(x, y) & Clear(x)',\n",
" effect='~On(x, y) & Clear(y) & OnTable(x)'),\n",
" Action('FromTable(y, x)',\n",
" precond='OnTable(y) & Clear(y) & Clear(x)',\n",
" effect='~OnTable(y) & ~Clear(x) & On(y, x)')])\n",
"
def shopping_problem():\n",
" """\n",
" SHOPPING-PROBLEM\n",
"\n",
" A problem of acquiring some items given their availability at certain stores.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> sp = shopping_problem()\n",
" >>> sp.goal_test()\n",
" False\n",
" >>> sp.act(expr('Go(Home, HW)'))\n",
" >>> sp.act(expr('Buy(Drill, HW)'))\n",
" >>> sp.act(expr('Go(HW, SM)'))\n",
" >>> sp.act(expr('Buy(Banana, SM)'))\n",
" >>> sp.goal_test()\n",
" False\n",
" >>> sp.act(expr('Buy(Milk, SM)'))\n",
" >>> sp.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='At(Home) & Sells(SM, Milk) & Sells(SM, Banana) & Sells(HW, Drill)',\n",
" goals='Have(Milk) & Have(Banana) & Have(Drill)', \n",
" actions=[Action('Buy(x, store)',\n",
" precond='At(store) & Sells(store, x)',\n",
" effect='Have(x)'),\n",
" Action('Go(x, y)',\n",
" precond='At(x)',\n",
" effect='At(y) & ~At(x)')])\n",
"
def socks_and_shoes():\n",
" """\n",
" SOCKS-AND-SHOES-PROBLEM\n",
"\n",
" A task of wearing socks and shoes on both feet\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> ss = socks_and_shoes()\n",
" >>> ss.goal_test()\n",
" False\n",
" >>> ss.act(expr('RightSock'))\n",
" >>> ss.act(expr('RightShoe'))\n",
" >>> ss.act(expr('LeftSock'))\n",
" >>> ss.goal_test()\n",
" False\n",
" >>> ss.act(expr('LeftShoe'))\n",
" >>> ss.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='',\n",
" goals='RightShoeOn & LeftShoeOn',\n",
" actions=[Action('RightShoe',\n",
" precond='RightSockOn',\n",
" effect='RightShoeOn'),\n",
" Action('RightSock',\n",
" precond='',\n",
" effect='RightSockOn'),\n",
" Action('LeftShoe',\n",
" precond='LeftSockOn',\n",
" effect='LeftShoeOn'),\n",
" Action('LeftSock',\n",
" precond='',\n",
" effect='LeftSockOn')])\n",
"
def have_cake_and_eat_cake_too():\n",
" """\n",
" [Figure 10.7] CAKE-PROBLEM\n",
"\n",
" A problem where we begin with a cake and want to \n",
" reach the state of having a cake and having eaten a cake.\n",
" The possible actions include baking a cake and eating a cake.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> cp = have_cake_and_eat_cake_too()\n",
" >>> cp.goal_test()\n",
" False\n",
" >>> cp.act(expr('Eat(Cake)'))\n",
" >>> cp.goal_test()\n",
" False\n",
" >>> cp.act(expr('Bake(Cake)'))\n",
" >>> cp.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='Have(Cake)',\n",
" goals='Have(Cake) & Eaten(Cake)',\n",
" actions=[Action('Eat(Cake)',\n",
" precond='Have(Cake)',\n",
" effect='Eaten(Cake) & ~Have(Cake)'),\n",
" Action('Bake(Cake)',\n",
" precond='~Have(Cake)',\n",
" effect='Have(Cake)')])\n",
"
class Problem(PlanningProblem):\n",
" """\n",
" Define real-world problems by aggregating resources as numerical quantities instead of\n",
" named entities.\n",
"\n",
" This class is identical to PDLL, except that it overloads the act function to handle\n",
" resource and ordering conditions imposed by HLA as opposed to Action.\n",
" """\n",
" def __init__(self, init, goals, actions, jobs=None, resources=None):\n",
" super().__init__(init, goals, actions)\n",
" self.jobs = jobs\n",
" self.resources = resources or {}\n",
"\n",
" def act(self, action):\n",
" """\n",
" Performs the HLA given as argument.\n",
"\n",
" Note that this is different from the superclass action - where the parameter was an\n",
" Expression. For real world problems, an Expr object isn't enough to capture all the\n",
" detail required for executing the action - resources, preconditions, etc need to be\n",
" checked for too.\n",
" """\n",
" args = action.args\n",
" list_action = first(a for a in self.actions if a.name == action.name)\n",
" if list_action is None:\n",
" raise Exception("Action '{}' not found".format(action.name))\n",
" self.init = list_action.do_action(self.jobs, self.resources, self.init, args).clauses\n",
"\n",
" def refinements(hla, state, library): # refinements may be (multiple) HLA themselves ...\n",
" """\n",
" state is a Problem, containing the current state kb\n",
" library is a dictionary containing details for every possible refinement. eg:\n",
" {\n",
" 'HLA': [\n",
" 'Go(Home, SFO)',\n",
" 'Go(Home, SFO)',\n",
" 'Drive(Home, SFOLongTermParking)',\n",
" 'Shuttle(SFOLongTermParking, SFO)',\n",
" 'Taxi(Home, SFO)'\n",
" ],\n",
" 'steps': [\n",
" ['Drive(Home, SFOLongTermParking)', 'Shuttle(SFOLongTermParking, SFO)'],\n",
" ['Taxi(Home, SFO)'],\n",
" [],\n",
" [],\n",
" []\n",
" ],\n",
" # empty refinements indicate a primitive action\n",
" 'precond': [\n",
" ['At(Home) & Have(Car)'],\n",
" ['At(Home)'],\n",
" ['At(Home) & Have(Car)'],\n",
" ['At(SFOLongTermParking)'],\n",
" ['At(Home)']\n",
" ],\n",
" 'effect': [\n",
" ['At(SFO) & ~At(Home)'],\n",
" ['At(SFO) & ~At(Home)'],\n",
" ['At(SFOLongTermParking) & ~At(Home)'],\n",
" ['At(SFO) & ~At(SFOLongTermParking)'],\n",
" ['At(SFO) & ~At(Home)']\n",
" ]\n",
" }\n",
" """\n",
" e = Expr(hla.name, hla.args)\n",
" indices = [i for i, x in enumerate(library['HLA']) if expr(x).op == hla.name]\n",
" for i in indices:\n",
" actions = []\n",
" for j in range(len(library['steps'][i])):\n",
" # find the index of the step [j] of the HLA \n",
" index_step = [k for k,x in enumerate(library['HLA']) if x == library['steps'][i][j]][0]\n",
" precond = library['precond'][index_step][0] # preconditions of step [j]\n",
" effect = library['effect'][index_step][0] # effect of step [j]\n",
" actions.append(HLA(library['steps'][i][j], precond, effect))\n",
" yield actions\n",
"\n",
" def hierarchical_search(problem, hierarchy):\n",
" """\n",
" [Figure 11.5] 'Hierarchical Search, a Breadth First Search implementation of Hierarchical\n",
" Forward Planning Search'\n",
" The problem is a real-world problem defined by the problem class, and the hierarchy is\n",
" a dictionary of HLA - refinements (see refinements generator for details)\n",
" """\n",
" act = Node(problem.actions[0])\n",
" frontier = deque()\n",
" frontier.append(act)\n",
" while True:\n",
" if not frontier:\n",
" return None\n",
" plan = frontier.popleft()\n",
" print(plan.state.name)\n",
" hla = plan.state # first_or_null(plan)\n",
" prefix = None\n",
" if plan.parent:\n",
" prefix = plan.parent.state.action # prefix, suffix = subseq(plan.state, hla)\n",
" outcome = Problem.result(problem, prefix)\n",
" if hla is None:\n",
" if outcome.goal_test():\n",
" return plan.path()\n",
" else:\n",
" print("else")\n",
" for sequence in Problem.refinements(hla, outcome, hierarchy):\n",
" print("...")\n",
" frontier.append(Node(plan.state, plan.parent, sequence))\n",
"\n",
" def result(state, actions):\n",
" """The outcome of applying an action to the current problem"""\n",
" for a in actions: \n",
" if a.check_precond(state, a.args):\n",
" state = a(state, a.args).clauses\n",
" return state\n",
" \n",
"\n",
" def angelic_search(problem, hierarchy, initialPlan):\n",
" """\n",
"\t[Figure 11.8] A hierarchical planning algorithm that uses angelic semantics to identify and\n",
"\tcommit to high-level plans that work while avoiding high-level plans that don’t. \n",
"\tThe predicate MAKING-PROGRESS checks to make sure that we aren’t stuck in an infinite regression\n",
"\tof refinements. \n",
"\tAt top level, call ANGELIC -SEARCH with [Act ] as the initialPlan .\n",
"\n",
" initialPlan contains a sequence of HLA's with angelic semantics \n",
"\n",
" The possible effects of an angelic HLA in initialPlan are : \n",
" ~ : effect remove\n",
" $+: effect possibly add\n",
" $-: effect possibly remove\n",
" $$: possibly add or remove\n",
"\t"""\n",
" frontier = deque(initialPlan)\n",
" while True: \n",
" if not frontier:\n",
" return None\n",
" plan = frontier.popleft() # sequence of HLA/Angelic HLA's \n",
" opt_reachable_set = Problem.reach_opt(problem.init, plan)\n",
" pes_reachable_set = Problem.reach_pes(problem.init, plan)\n",
" if problem.intersects_goal(opt_reachable_set): \n",
" if Problem.is_primitive( plan, hierarchy ): \n",
" return ([x for x in plan.action])\n",
" guaranteed = problem.intersects_goal(pes_reachable_set) \n",
" if guaranteed and Problem.making_progress(plan, plan):\n",
" final_state = guaranteed[0] # any element of guaranteed \n",
" #print('decompose')\n",
" return Problem.decompose(hierarchy, problem, plan, final_state, pes_reachable_set)\n",
" (hla, index) = Problem.find_hla(plan, hierarchy) # there should be at least one HLA/Angelic_HLA, otherwise plan would be primitive.\n",
" prefix = plan.action[:index-1]\n",
" suffix = plan.action[index+1:]\n",
" outcome = Problem(Problem.result(problem.init, prefix), problem.goals , problem.actions )\n",
" for sequence in Problem.refinements(hla, outcome, hierarchy): # find refinements\n",
" frontier.append(Angelic_Node(outcome.init, plan, prefix + sequence+ suffix, prefix+sequence+suffix))\n",
"\n",
"\n",
" def intersects_goal(problem, reachable_set):\n",
" """\n",
" Find the intersection of the reachable states and the goal\n",
" """\n",
" return [y for x in list(reachable_set.keys()) for y in reachable_set[x] if all(goal in y for goal in problem.goals)] \n",
"\n",
"\n",
" def is_primitive(plan, library):\n",
" """\n",
" checks if the hla is primitive action \n",
" """\n",
" for hla in plan.action: \n",
" indices = [i for i, x in enumerate(library['HLA']) if expr(x).op == hla.name]\n",
" for i in indices:\n",
" if library["steps"][i]: \n",
" return False\n",
" return True\n",
" \n",
"\n",
"\n",
" def reach_opt(init, plan): \n",
" """\n",
" Finds the optimistic reachable set of the sequence of actions in plan \n",
" """\n",
" reachable_set = {0: [init]}\n",
" optimistic_description = plan.action #list of angelic actions with optimistic description\n",
" return Problem.find_reachable_set(reachable_set, optimistic_description)\n",
" \n",
"\n",
" def reach_pes(init, plan): \n",
" """ \n",
" Finds the pessimistic reachable set of the sequence of actions in plan\n",
" """\n",
" reachable_set = {0: [init]}\n",
" pessimistic_description = plan.action_pes # list of angelic actions with pessimistic description\n",
" return Problem.find_reachable_set(reachable_set, pessimistic_description)\n",
"\n",
" def find_reachable_set(reachable_set, action_description):\n",
" """\n",
"\tFinds the reachable states of the action_description when applied in each state of reachable set.\n",
"\t"""\n",
" for i in range(len(action_description)):\n",
" reachable_set[i+1]=[]\n",
" if type(action_description[i]) is Angelic_HLA:\n",
" possible_actions = action_description[i].angelic_action()\n",
" else: \n",
" possible_actions = action_description\n",
" for action in possible_actions:\n",
" for state in reachable_set[i]:\n",
" if action.check_precond(state , action.args) :\n",
" if action.effect[0] :\n",
" new_state = action(state, action.args).clauses\n",
" reachable_set[i+1].append(new_state)\n",
" else: \n",
" reachable_set[i+1].append(state)\n",
" return reachable_set\n",
"\n",
" def find_hla(plan, hierarchy):\n",
" """\n",
" Finds the the first HLA action in plan.action, which is not primitive\n",
" and its corresponding index in plan.action\n",
" """\n",
" hla = None\n",
" index = len(plan.action)\n",
" for i in range(len(plan.action)): # find the first HLA in plan, that is not primitive\n",
" if not Problem.is_primitive(Node(plan.state, plan.parent, [plan.action[i]]), hierarchy):\n",
" hla = plan.action[i] \n",
" index = i\n",
" break\n",
" return (hla, index)\n",
"\t\n",
" def making_progress(plan, initialPlan):\n",
" """ \n",
" Not correct\n",
"\n",
" Normally should from infinite regression of refinements \n",
" \n",
" Only case covered: when plan contains one action (then there is no regression to be done) \n",
" """\n",
" if (len(plan.action)==1):\n",
" return False\n",
" return True \n",
"\n",
" def decompose(hierarchy, s_0, plan, s_f, reachable_set):\n",
" solution = [] \n",
" while plan.action_pes: \n",
" action = plan.action_pes.pop()\n",
" i = max(reachable_set.keys())\n",
" if (i==0): \n",
" return solution\n",
" s_i = Problem.find_previous_state(s_f, reachable_set,i, action) \n",
" problem = Problem(s_i, s_f , plan.action)\n",
" j=0\n",
" for x in Problem.angelic_search(problem, hierarchy, [Angelic_Node(s_i, Node(None), [action],[action])]):\n",
" solution.insert(j,x)\n",
" j+=1\n",
" s_f = s_i\n",
" return solution\n",
"\n",
"\n",
" def find_previous_state(s_f, reachable_set, i, action):\n",
" """\n",
" Given a final state s_f and an action finds a state s_i in reachable_set \n",
" such that when action is applied to state s_i returns s_f. \n",
" """\n",
" s_i = reachable_set[i-1][0]\n",
" for state in reachable_set[i-1]:\n",
" if s_f in [x for x in Problem.reach_pes(state, Angelic_Node(state, None, [action],[action]))[1]]:\n",
" s_i =state\n",
" break\n",
" return s_i\n",
"
class HLA(Action):\n",
" """\n",
" Define Actions for the real-world (that may be refined further), and satisfy resource\n",
" constraints.\n",
" """\n",
" unique_group = 1\n",
"\n",
" def __init__(self, action, precond=None, effect=None, duration=0,\n",
" consume=None, use=None):\n",
" """\n",
" As opposed to actions, to define HLA, we have added constraints.\n",
" duration holds the amount of time required to execute the task\n",
" consumes holds a dictionary representing the resources the task consumes\n",
" uses holds a dictionary representing the resources the task uses\n",
" """\n",
" precond = precond or [None]\n",
" effect = effect or [None]\n",
" super().__init__(action, precond, effect)\n",
" self.duration = duration\n",
" self.consumes = consume or {}\n",
" self.uses = use or {}\n",
" self.completed = False\n",
" # self.priority = -1 # must be assigned in relation to other HLAs\n",
" # self.job_group = -1 # must be assigned in relation to other HLAs\n",
"\n",
" def do_action(self, job_order, available_resources, kb, args):\n",
" """\n",
" An HLA based version of act - along with knowledge base updation, it handles\n",
" resource checks, and ensures the actions are executed in the correct order.\n",
" """\n",
" # print(self.name)\n",
" if not self.has_usable_resource(available_resources):\n",
" raise Exception('Not enough usable resources to execute {}'.format(self.name))\n",
" if not self.has_consumable_resource(available_resources):\n",
" raise Exception('Not enough consumable resources to execute {}'.format(self.name))\n",
" if not self.inorder(job_order):\n",
" raise Exception("Can't execute {} - execute prerequisite actions first".\n",
" format(self.name))\n",
" kb = super().act(kb, args) # update knowledge base\n",
" for resource in self.consumes: # remove consumed resources\n",
" available_resources[resource] -= self.consumes[resource]\n",
" self.completed = True # set the task status to complete\n",
" return kb\n",
"\n",
" def has_consumable_resource(self, available_resources):\n",
" """\n",
" Ensure there are enough consumable resources for this action to execute.\n",
" """\n",
" for resource in self.consumes:\n",
" if available_resources.get(resource) is None:\n",
" return False\n",
" if available_resources[resource] < self.consumes[resource]:\n",
" return False\n",
" return True\n",
"\n",
" def has_usable_resource(self, available_resources):\n",
" """\n",
" Ensure there are enough usable resources for this action to execute.\n",
" """\n",
" for resource in self.uses:\n",
" if available_resources.get(resource) is None:\n",
" return False\n",
" if available_resources[resource] < self.uses[resource]:\n",
" return False\n",
" return True\n",
"\n",
" def inorder(self, job_order):\n",
" """\n",
" Ensure that all the jobs that had to be executed before the current one have been\n",
" successfully executed.\n",
" """\n",
" for jobs in job_order:\n",
" if self in jobs:\n",
" for job in jobs:\n",
" if job is self:\n",
" return True\n",
" if not job.completed:\n",
" return False\n",
" return True\n",
"
def job_shop_problem():\n",
" """\n",
" [Figure 11.1] JOB-SHOP-PROBLEM\n",
"\n",
" A job-shop scheduling problem for assembling two cars,\n",
" with resource and ordering constraints.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> p = job_shop_problem()\n",
" >>> p.goal_test()\n",
" False\n",
" >>> p.act(p.jobs[1][0])\n",
" >>> p.act(p.jobs[1][1])\n",
" >>> p.act(p.jobs[1][2])\n",
" >>> p.act(p.jobs[0][0])\n",
" >>> p.act(p.jobs[0][1])\n",
" >>> p.goal_test()\n",
" False\n",
" >>> p.act(p.jobs[0][2])\n",
" >>> p.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
" resources = {'EngineHoists': 1, 'WheelStations': 2, 'Inspectors': 2, 'LugNuts': 500}\n",
"\n",
" add_engine1 = HLA('AddEngine1', precond='~Has(C1, E1)', effect='Has(C1, E1)', duration=30, use={'EngineHoists': 1})\n",
" add_engine2 = HLA('AddEngine2', precond='~Has(C2, E2)', effect='Has(C2, E2)', duration=60, use={'EngineHoists': 1})\n",
" add_wheels1 = HLA('AddWheels1', precond='~Has(C1, W1)', effect='Has(C1, W1)', duration=30, use={'WheelStations': 1}, consume={'LugNuts': 20})\n",
" add_wheels2 = HLA('AddWheels2', precond='~Has(C2, W2)', effect='Has(C2, W2)', duration=15, use={'WheelStations': 1}, consume={'LugNuts': 20})\n",
" inspect1 = HLA('Inspect1', precond='~Inspected(C1)', effect='Inspected(C1)', duration=10, use={'Inspectors': 1})\n",
" inspect2 = HLA('Inspect2', precond='~Inspected(C2)', effect='Inspected(C2)', duration=10, use={'Inspectors': 1})\n",
"\n",
" actions = [add_engine1, add_engine2, add_wheels1, add_wheels2, inspect1, inspect2]\n",
"\n",
" job_group1 = [add_engine1, add_wheels1, inspect1]\n",
" job_group2 = [add_engine2, add_wheels2, inspect2]\n",
"\n",
" return Problem(init='Car(C1) & Car(C2) & Wheels(W1) & Wheels(W2) & Engine(E2) & Engine(E2) & ~Has(C1, E1) & ~Has(C2, E2) & ~Has(C1, W1) & ~Has(C2, W2) & ~Inspected(C1) & ~Inspected(C2)',\n",
" goals='Has(C1, W1) & Has(C1, E1) & Inspected(C1) & Has(C2, W2) & Has(C2, E2) & Inspected(C2)',\n",
" actions=actions,\n",
" jobs=[job_group1, job_group2],\n",
" resources=resources)\n",
"
def double_tennis_problem():\n",
" """\n",
" [Figure 11.10] DOUBLE-TENNIS-PROBLEM\n",
"\n",
" A multiagent planning problem involving two partner tennis players\n",
" trying to return an approaching ball and repositioning around in the court.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> dtp = double_tennis_problem()\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" False\n",
" >>> dtp.act(expr('Go(A, RightBaseLine, LeftBaseLine)'))\n",
" >>> dtp.act(expr('Hit(A, Ball, RightBaseLine)'))\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" False\n",
" >>> dtp.act(expr('Go(A, LeftNet, RightBaseLine)'))\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='At(A, LeftBaseLine) & At(B, RightNet) & Approaching(Ball, RightBaseLine) & Partner(A, B) & Partner(B, A)',\n",
" goals='Returned(Ball) & At(a, LeftNet) & At(a, RightNet)',\n",
" actions=[Action('Hit(actor, Ball, loc)',\n",
" precond='Approaching(Ball, loc) & At(actor, loc)',\n",
" effect='Returned(Ball)'),\n",
" Action('Go(actor, to, loc)', \n",
" precond='At(actor, loc)',\n",
" effect='At(actor, to) & ~At(actor, loc)')])\n",
"