{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Welcome! If you are new to Google Colab/Jupyter notebooks, you might take a look at [this notebook](https://colab.research.google.com/notebooks/basic_features_overview.ipynb) first.\n", "\n", "**I recommend you run the first code cell of this notebook immediately, to start provisioning drake on the cloud machine, then you can leave this window open as you [read the textbook](http://underactuated.csail.mit.edu/sysid.html).**\n", "\n", "# Notebook Setup\n", "\n", "The following cell will:\n", "- on Colab (only), install Drake to `/opt/drake`, install Drake's prerequisites via `apt`, and add pydrake to `sys.path`. This will take approximately two minutes on the first time it runs (to provision the machine), but should only need to reinstall once every 12 hours. If you navigate between notebooks using Colab's \"File->Open\" menu, then you can avoid provisioning a separate machine for each notebook.\n", "- import packages used throughout the notebook.\n", "\n", "You will need to rerun this cell if you restart the kernel, but it should be fast (even on Colab) because the machine will already have drake installed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import importlib\n", "import sys\n", "from urllib.request import urlretrieve\n", "\n", "# Install drake (and underactuated).\n", "if 'google.colab' in sys.modules and importlib.util.find_spec('underactuated') is None:\n", " urlretrieve(f\"http://underactuated.csail.mit.edu/scripts/setup/setup_underactuated_colab.py\",\n", " \"setup_underactuated_colab.py\")\n", " from setup_underactuated_colab import setup_underactuated\n", " setup_underactuated(underactuated_sha='ffe2b28ed89637889c04405e5d7d2d98be3df5b6', drake_version='0.27.0', drake_build='release')\n", "\n", "server_args = []\n", "if 'google.colab' in sys.modules:\n", " server_args = ['--ngrok_http_tunnel']\n", "\n", "from meshcat.servers.zmqserver import start_zmq_server_as_subprocess\n", "proc, zmq_url, web_url = start_zmq_server_as_subprocess(server_args=server_args)\n", "\n", "import numpy as np\n", "from ipywidgets import FloatSlider, ToggleButton\n", "from IPython.display import display, SVG\n", "import pydot\n", "\n", "from underactuated.jupyter import running_as_notebook\n", "\n", "np.set_printoptions(suppress=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# LQR for the Acrobot with joint offsets\n", "\n", "Adding only a very small offset (my default is 0.01 radians, or approximately half a degree for each joint) can lead to quite large steady-state errors.\n", "\n", "TODO: Make a plot of offset magnitude to steady-state error." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pydrake.all\n", "from pydrake.all import (\n", " DiagramBuilder, LinearQuadraticRegulator, Saturation, SceneGraph, Simulator, \n", " WrapToSystem, AddMultibodyPlantSceneGraph, Parser, Adder, ConstantVectorSource,\n", ")\n", "from pydrake.examples.acrobot import (AcrobotInput, AcrobotGeometry, \n", " AcrobotPlant, AcrobotState)\n", "\n", "\n", "def acrobot_balancing_example(offsets=[0.01,0.01]):\n", " builder = DiagramBuilder()\n", "\n", " def UprightState():\n", " state = AcrobotState()\n", " state.set_theta1(np.pi)\n", " state.set_theta2(0.)\n", " state.set_theta1dot(0.)\n", " state.set_theta2dot(0.)\n", " return state\n", "\n", " def BalancingLQR():\n", " # Design an LQR controller for stabilizing the Acrobot around the upright.\n", " # Returns a (static) AffineSystem that implements the controller (in\n", " # the original AcrobotState coordinates).\n", "\n", " acrobot = AcrobotPlant()\n", " context = acrobot.CreateDefaultContext()\n", "\n", " input = AcrobotInput()\n", " input.set_tau(0.)\n", " acrobot.get_input_port(0).FixValue(context, input)\n", "\n", " context.get_mutable_continuous_state_vector()\\\n", " .SetFromVector(UprightState().CopyToVector())\n", "\n", " Q = np.diag((10., 10., 1., 1.))\n", " R = [1]\n", "\n", " return LinearQuadraticRegulator(acrobot, context, Q, R)\n", "\n", "\n", " acrobot = builder.AddSystem(AcrobotPlant())\n", " acrobot.set_name('acrobot')\n", "\n", " saturation = builder.AddSystem(Saturation(min_value=[-10], max_value=[10]))\n", " saturation.set_name('saturation')\n", " builder.Connect(saturation.get_output_port(0), acrobot.get_input_port(0))\n", " wrapto = builder.AddSystem(WrapToSystem(4))\n", " wrapto.set_name('wrap_angles')\n", " wrapto.set_interval(0, 0, 2. * np.pi)\n", " wrapto.set_interval(1, -np.pi, np.pi)\n", " builder.Connect(acrobot.get_output_port(0), wrapto.get_input_port(0))\n", " controller = builder.AddSystem(BalancingLQR())\n", " controller.set_name(\"lqr controller\")\n", "\n", " # I'll add the offsets in here:\n", " adder = builder.AddSystem(Adder(2, 4))\n", " adder.set_name(\"adder\")\n", " builder.Connect(wrapto.get_output_port(), adder.get_input_port(0))\n", " builder.Connect(adder.get_output_port(), controller.get_input_port())\n", " offset = builder.AddSystem(ConstantVectorSource(np.array(offsets + [0, 0])))\n", " offset.set_name(\"state calibration offsets\")\n", " builder.Connect(offset.get_output_port(), adder.get_input_port(1))\n", "\n", " builder.Connect(controller.get_output_port(), saturation.get_input_port())\n", " # Setup visualization\n", " scene_graph = builder.AddSystem(SceneGraph())\n", " AcrobotGeometry.AddToBuilder(builder, acrobot.get_output_port(0), scene_graph)\n", " visualizer = pydrake.systems.meshcat_visualizer.ConnectMeshcatVisualizer(\n", " builder, \n", " scene_graph=scene_graph, \n", " zmq_url=zmq_url)\n", " visualizer.vis.delete()\n", " visualizer.set_planar_viewpoint(xmin=-3.0, xmax=3.0, ymin=-3.0, ymax=4.0)\n", "\n", " diagram = builder.Build()\n", "\n", " display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))\n", "\n", " # Set up a simulator to run this diagram\n", " simulator = Simulator(diagram)\n", " simulator.set_target_realtime_rate(1.0)\n", " context = simulator.get_mutable_context()\n", "\n", " # Simulate\n", " duration = 4.0 if running_as_notebook else 0.1 # sets a shorter duration during testing\n", " for i in range(5):\n", " context.SetTime(0.)\n", " context.SetContinuousState(UprightState().CopyToVector() +\n", " 0.05 * np.random.randn(4,))\n", " simulator.Initialize()\n", " simulator.AdvanceTo(duration)\n", "\n", "acrobot_balancing_example()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Extracting the lumped parameters from the Cart-Pole\n", "\n", "Recall that the dynamics of our cart-pole system were\n", "\n", "\\begin{gather}\n", "(m_c + m_p)\\ddot{x} + m_p l \\ddot\\theta \\cos\\theta - m_p l \\dot\\theta^2 \\sin\\theta = f_x \\\\\n", "m_p l \\ddot{x} \\cos\\theta + m_p l^2 \\ddot\\theta + m_p g l \\sin\\theta = 0\n", "\\end{gather}\n", "\n", "Note: This example requires https://github.com/RobotLocomotion/drake/pull/14863 , which is not available in the drake binaries quite yet." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "from pandas import DataFrame\n", "from pydrake.all import (\n", " MultibodyPlant, Parser, MakeVectorVariable, MultibodyForces_, SpatialInertia_, UnitInertia_,\n", " Expression, Variable, Polynomial, Variables, DecomposeLumpedParameters, Evaluate,\n", " PiecewisePolynomial, TrajectorySource,\n", " DiagramBuilder, Simulator, LogOutput\n", ") \n", "from underactuated import FindResource\n", "\n", "def cartpole_generate_data():\n", " fig, ax = plt.subplots(2, 2)\n", " ax[0,0].set_xlabel('t')\n", " ax[0,0].set_ylabel('x')\n", " ax[1,0].set_xlabel('t')\n", " ax[1,0].set_ylabel('theta')\n", " ax[0,1].set_xlabel('t')\n", " ax[0,1].set_ylabel('xdot')\n", " ax[1,1].set_xlabel('t')\n", " ax[1,1].set_ylabel('thetadot')\n", "\n", " data = []\n", "\n", " for i in range(4):\n", " ts = np.arange(0, 4, 0.1)\n", " utraj = PiecewisePolynomial.CubicShapePreserving(ts, 10*np.sin((i+1)*ts).reshape((1,-1)))\n", "\n", " builder = DiagramBuilder()\n", "\n", " plant = builder.AddSystem(MultibodyPlant(0.0))\n", " file_name = FindResource(\"models/cartpole.urdf\")\n", " Parser(plant).AddModelFromFile(file_name)\n", " plant.Finalize()\n", "\n", " usys = builder.AddSystem(TrajectorySource(utraj)) \n", " builder.Connect(usys.get_output_port(), plant.get_actuation_input_port())\n", "\n", " logger = LogOutput(plant.get_state_output_port(), builder)\n", " diagram = builder.Build()\n", "\n", " simulator = Simulator(diagram)\n", " simulator.AdvanceTo(ts[-1])\n", "\n", " data.append(DataFrame({\n", " 't': logger.sample_times(), \n", " 'u': utraj.vector_values(logger.sample_times())[0,:],\n", " 'x': logger.data()[0,:],\n", " 'theta': logger.data()[1,:],\n", " 'xdot': logger.data()[2,:],\n", " 'thetadot': logger.data()[3,:],\n", " }))\n", "\n", " ax[0,0].plot(data[i].t, data[i].x.T)\n", " ax[1,0].plot(data[i].t, data[i].theta.T)\n", " ax[0,1].plot(data[i].t, data[i].xdot.T)\n", " ax[1,1].plot(data[i].t, data[i].thetadot.T)\n", "\n", " return data\n", " \n", "\n", "data = cartpole_generate_data()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "def cartpole_estimate_lumped_parameters(data):\n", " plant = MultibodyPlant(0.0)\n", " file_name = FindResource(\"models/cartpole.urdf\")\n", " Parser(plant).AddModelFromFile(file_name)\n", " plant.Finalize()\n", "\n", " sym_plant = plant.ToSymbolic()\n", " sym_context = sym_plant.CreateDefaultContext()\n", "\n", " # State variables\n", " q = MakeVectorVariable(2, \"q\")\n", " v = MakeVectorVariable(2, \"v\")\n", " vd = MakeVectorVariable(2, \"vd\")\n", " tau = MakeVectorVariable(1, \"u\")\n", "\n", " # Parameters\n", " mc = Variable(\"mc\")\n", " mp = Variable(\"mp\")\n", " l = Variable(\"l\")\n", "\n", " sym_plant.get_actuation_input_port().FixValue(sym_context, tau)\n", " sym_plant.SetPositions(sym_context, q)\n", " sym_plant.SetVelocities(sym_context, v)\n", "\n", " cart = sym_plant.GetBodyByName(\"cart\")\n", " cart.SetMass(sym_context, mc)\n", " pole = sym_plant.GetBodyByName(\"pole\")\n", " inertia = SpatialInertia_[Expression](mp, [0, 0, -l], UnitInertia_[Expression](l*l,l*l,0))\n", " pole.SetSpatialInertiaInBodyFrame(sym_context, inertia)\n", "\n", " # TODO: Set the joint damping pending resolution of drake #14405\n", " # x = sym_plant.GetJointByName(\"x\")\n", " # theta = sym_plant.GetJointByName(\"theta\")\n", "\n", " forces = MultibodyForces_[Expression](sym_plant)\n", " sym_plant.CalcForceElementsContribution(sym_context, forces)\n", " error = sym_plant.CalcInverseDynamics(sym_context, vd, forces) - sym_plant.MakeActuationMatrix() @ tau\n", "\n", " print('Symbolic dynamics: ')\n", " print(error[0].Expand())\n", " print(error[1].Expand())\n", " print('')\n", "\n", " W, alpha, w0 = DecomposeLumpedParameters(error, [mc, mp, l])\n", "\n", " # Form the data matrix \n", " # TODO: Update this to the vectorized version pending drake #8495\n", "# env = {\n", "# q[0]: np.concatenate([d.x[:-1] for d in data]),\n", "# q[1]: np.concatenate([d.theta[:-1] for d in data]),\n", "# v[0]: np.concatenate([d.xdot[:-1] for d in data]),\n", "# v[1]: np.concatenate([d.thetadot[:-1] for d in data]),\n", "# vd[0]: np.concatenate([(d.xdot[1:]-d.xdot[:-1])/h for d in data]),\n", "# vd[1]: np.concatenate([(d.thetadot[1:]-d.thetadot[:-1])/h for d in data]),\n", "# tau[0]:np.concatenate([d.u[:-1] for d in data])\n", "# } \n", "# Wdata = Evaluate(W, env)\n", " M = data[0].t.shape[0] - 1\n", " MM = 2*M*len(data)\n", " N = alpha.size\n", " Wdata = np.zeros((MM, N))\n", " w0data = np.zeros((MM, 1))\n", " offset = 0\n", " for d in data:\n", " for i in range(M):\n", " h = d.t[i+1]-d.t[i]\n", " env = {\n", " q[0]: d.x[i],\n", " q[1]: d.theta[i],\n", " v[0]: d.xdot[i],\n", " v[1]: d.thetadot[i],\n", " vd[0]: (d.xdot[i+1]-d.xdot[i])/h,\n", " vd[1]: (d.thetadot[i+1]-d.thetadot[i])/h,\n", " tau[0]: d.u[i],\n", " }\n", " \n", " Wdata[offset:offset+2,:] = Evaluate(W, env)\n", " w0data[offset:offset+2] = Evaluate(w0, env)\n", " offset += 2\n", "\n", " alpha_fit = np.linalg.lstsq(Wdata, -w0data, rcond=None)[0]\n", " alpha_true = Evaluate(alpha, {mc: 10, mp: 1, l: .5})\n", "\n", " print('Estimated Lumped Parameters:')\n", " for i in range(len(alpha)):\n", " print(f\"{alpha[i]}. URDF: {alpha_true[i,0]}, Estimated: {alpha_fit[i,0]}\")\n", "\n", "cartpole_estimate_lumped_parameters(data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Cart-pole balancing with an identified model\n", "\n", "Let's do this in a few steps. First the setup..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "from pydrake.all import (\n", " DiagramBuilder, Simulator, LogOutput,\n", " Adder, Gain, PiecewisePolynomial, TrajectorySource,\n", " AddMultibodyPlantSceneGraph, Parser, SpatialInertia, UnitInertia,\n", " LinearQuadraticRegulator, ConnectMeshcatVisualizer,\n", ")\n", "from underactuated import FindResource\n", "from pandas import DataFrame\n", "\n", "def UprightState():\n", " state = [0, np.pi, 0, 0]\n", " return state\n", "\n", "def BalancingLQR(plant, context):\n", " # Set u0 = 0\n", " plant.get_actuation_input_port().FixValue(context, [0])\n", " # Set x0 = UprightState()\n", " plant.SetPositionsAndVelocities(context, UprightState())\n", "\n", " Q = np.diag([10., 10., 1., 1.])\n", " R = [1]\n", "\n", " return LinearQuadraticRegulator(plant, context,\n", " Q, R, input_port_index=int(plant.get_actuation_input_port().get_index()))\n", "\n", "\n", "def cartpole_balancing_example(change_params=False, input_excitation=False, controller=None):\n", " builder = DiagramBuilder()\n", " plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.1)\n", " file_name = FindResource(\"models/cartpole.urdf\")\n", " Parser(plant).AddModelFromFile(file_name)\n", " plant.Finalize()\n", "\n", " context = plant.CreateDefaultContext()\n", "\n", " if (change_params):\n", " # urdf has mc=10, mp=1, l=.5\n", " mc = 6.0\n", " mp = 1.5\n", " l = 0.5\n", " cart = plant.GetBodyByName(\"cart\")\n", " cart.SetMass(context, mc)\n", " pole = plant.GetBodyByName(\"pole\")\n", " inertia = SpatialInertia(mp, [0, 0, -l], UnitInertia(l*l,l*l,0))\n", " pole.SetSpatialInertiaInBodyFrame(context, inertia)\n", "\n", " # TODO: Set the joint damping pending resolution of drake #14405\n", "\n", " x_log = LogOutput(plant.get_state_output_port(), builder)\n", "\n", " if controller:\n", " controller = builder.AddSystem(controller)\n", " else:\n", " controller = builder.AddSystem(BalancingLQR(plant, context))\n", " builder.Connect(plant.get_state_output_port(), controller.get_input_port(0))\n", " if input_excitation:\n", " # Add an uncorrelated (sine wave) input to make the dataset more rich\n", " ts = np.arange(0, 8.0, 0.1)\n", " pp = PiecewisePolynomial.CubicShapePreserving(ts, .1*np.sin(ts).reshape((1,-1)))\n", " noise = builder.AddSystem(TrajectorySource(pp))\n", "\n", " adder = builder.AddSystem(Adder(2, 1))\n", " builder.Connect(noise.get_output_port(), adder.get_input_port(0))\n", "\n", " builder.Connect(controller.get_output_port(), adder.get_input_port(1))\n", " builder.Connect(adder.get_output_port(), plant.get_actuation_input_port())\n", " u_log = LogOutput(adder.get_output_port(), builder)\n", " else:\n", " builder.Connect(controller.get_output_port(0),\n", " plant.get_actuation_input_port())\n", " u_log = LogOutput(controller.get_output_port(), builder)\n", "\n", " diagram = builder.Build()\n", "\n", " # Set up a simulator to run this diagram\n", " simulator = Simulator(diagram)\n", " context = simulator.get_mutable_context()\n", "\n", " simulator.get_mutable_integrator().set_fixed_step_mode(True)\n", " simulator.get_mutable_integrator().set_maximum_step_size(0.1)\n", "\n", " fig, ax = plt.subplots(1, 2)\n", " ax[0].set_xlabel('time (sec)')\n", " ax[0].set_ylabel('x (m)')\n", " ax[1].set_xlabel('time (sec)')\n", " ax[1].set_ylabel('theta (rad)')\n", "\n", " data = []\n", "\n", " # Simulate\n", " duration = 8.0 if running_as_notebook else 0.1 # sets a shorter duration during testing\n", " for i in range(15):\n", " u_log.reset()\n", " x_log.reset()\n", " context.SetTime(0.)\n", " plant.SetPositionsAndVelocities(\n", " plant.GetMyContextFromRoot(context),\n", " UprightState() + 0.01 * np.random.randn(4,)\n", " )\n", " simulator.Initialize()\n", " simulator.AdvanceTo(duration)\n", "\n", " data.append(DataFrame({\n", " 't': x_log.sample_times(), \n", " 'u': u_log.data()[0,:],\n", " 'x': x_log.data()[0,:],\n", " 'theta': x_log.data()[1,:],\n", " 'xdot': x_log.data()[2,:],\n", " 'thetadot': x_log.data()[3,:],\n", " }))\n", "\n", " ax[0].plot(data[-1].t, data[-1].x)\n", " ax[1].plot(data[-1].t, data[-1].theta)\n", "\n", " plt.show()\n", " return data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let's run the original cart-pole LQR controller (with a known model) again, to remember what the solutions look like." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = cartpole_balancing_example(change_params=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's run it again, but this time with the LQR controller designed using the *wrong* parameters for the cart-pole. You can see that it still keeps the cart-pole near the fixed point, but it's not converging very nicely! " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = cartpole_balancing_example(change_params=True, input_excitation=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's run (linear) system identification on the data that used the wrong controller." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def linear_system_id(data):\n", " N = np.array([d.t.size for d in data])\n", " X = np.zeros((4,sum(N-1)))\n", " Xn = np.zeros((4,sum(N-1)))\n", " U = np.zeros((1,sum(N-1)))\n", " offset = 0\n", " for d in data:\n", " N = d.t.size\n", " assert(np.allclose(np.diff(d.t), 0.1)) # make sure no extra time-samples snuck in\n", " X[0, offset:offset+N-1] = d.x[:-1]\n", " X[1, offset:offset+N-1] = d.theta[:-1]\n", " X[2, offset:offset+N-1] = d.xdot[:-1]\n", " X[3, offset:offset+N-1] = d.thetadot[:-1]\n", " U[0, offset:offset+N-1] = d.u[:-1]\n", " Xn[0, offset:offset+N-1] = d.x[1:]\n", " Xn[1, offset:offset+N-1] = d.theta[1:]\n", " Xn[2, offset:offset+N-1] = d.xdot[1:]\n", " Xn[3, offset:offset+N-1] = d.thetadot[1:]\n", " offset += N-1\n", "\n", " u0 = 0\n", " x0 = UprightState()\n", " U = U - u0\n", " for i in range(4):\n", " X[i,:] = X[i,:] - x0[i]\n", " Xn[i,:] = Xn[i,:] - x0[i]\n", "\n", " # Sanity check... (can make this go to zero if I narrow the initial conditions)\n", " #A, B = linearize_cartpole() # currently defined in the cell below...\n", " #print(np.linalg.norm(A @ X + B @ U - Xn, axis=1))\n", "\n", " # Xn = [A,B][X;U] or [X',U']*[A';B'] = Xn'\n", " ATBT, residuals, rank, s = np.linalg.lstsq(np.hstack((X.T,U.T)), Xn.T, rcond=None)\n", " A = ATBT[:4,:].T\n", " B = ATBT[-1,:].reshape((4,1))\n", " print(f'Least squares rank (out of possible 5): {rank}')\n", " print(f'residuals: {np.linalg.norm(A @ X + B @ U - Xn, axis=1)}')\n", " return A, B\n", "\n", "Ahat, Bhat = linear_system_id(data)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Try this**: Try generating data without adding noise to the controller. You'll see that the least-squares solution drops rank (and the controller will be garbage).\n", "\n", "Let's compare this with the true linearized dynamics of the cart-pole." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydrake.all import MultibodyPlant, Linearize\n", "\n", "def linearize_cartpole():\n", " # Note the discrete time here matches the sample time above.\n", " plant = MultibodyPlant(time_step=0.1) \n", " file_name = FindResource(\"models/cartpole.urdf\")\n", " Parser(plant).AddModelFromFile(file_name)\n", " plant.Finalize()\n", "\n", " context = plant.CreateDefaultContext()\n", " # Set u0 = 0\n", " plant.get_actuation_input_port().FixValue(context, [0])\n", " # Set x0 = UprightState()\n", " plant.SetPositionsAndVelocities(context, UprightState())\n", " linear_sys = Linearize(plant, context, \n", " input_port_index=plant.get_actuation_input_port().get_index(), \n", " output_port_index=plant.get_state_output_port().get_index())\n", " return linear_sys.A(), linear_sys.B()\n", "\n", "\n", "A, B = linearize_cartpole()\n", "\n", "np.set_printoptions(formatter={'float': lambda x: \"{0:0.4f}\".format(x)})\n", "print(\"A = \")\n", "print(A)\n", "print(\"Â = \")\n", "print(Ahat)\n", "print(\"\")\n", "print(\"B = \")\n", "print(B)\n", "print(\"B̂ = \")\n", "print(Bhat)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, let's run the cart-pole in closed-loop with an LQR controller designed from the estimated $A$ and $B$ matrices." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydrake.all import AffineSystem, DiscreteTimeLinearQuadraticRegulator\n", "\n", "def cartpole_lqr_with_identified_model(A, B):\n", " Q = np.diag([10., 10., 1., 1.])\n", " R = [1]\n", " K, S = DiscreteTimeLinearQuadraticRegulator(A, B, Q, R)\n", " # u = u0 - K*(x-x0) = (u0 + K*x0) - K*x\n", " u0 = [0]\n", " x0 = UprightState()\n", " controller = AffineSystem(\n", " A=np.zeros((0,0)), B=np.zeros((0,4)), f0=np.zeros((0,1)), \n", " C=np.zeros((1,0)), D=-K, y0=u0+K @ x0)\n", "\n", " cartpole_balancing_example(controller=controller)\n", "\n", "cartpole_lqr_with_identified_model(Ahat, Bhat)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Cart-pole identification from keypoints (using Ho-Kalman)\n", "\n", "Let's generate input-output data with the cart-pole with some additional random (Gaussian iid) input excitation. This time, instead of logging the true state, I'll include an additional block which outputs a number of keypoints on the cart and the pole." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from pydrake.all import (\n", " DiagramBuilder, Simulator, LogOutput,\n", " Adder, ConstantVectorSource, RandomSource, RandomDistribution, Gain,\n", " AddMultibodyPlantSceneGraph, Parser, RigidTransform_,\n", " LinearQuadraticRegulator, \n", " TemplateSystem, LeafSystem_, AbstractValue, BasicVector_,\n", ")\n", "from underactuated import FindResource\n", "from underactuated.jupyter import running_as_notebook, ToLatex\n", "from IPython.display import display, Latex, SVG\n", "import pydot\n", "\n", "def UprightState():\n", " state = [0, np.pi, 0, 0]\n", " return state\n", "\n", "def BalancingLQR(plant, context):\n", " # Set u0 = 0\n", " plant.get_actuation_input_port().FixValue(context, [0])\n", " # Set x0 = UprightState()\n", " plant.SetPositionsAndVelocities(context, UprightState())\n", "\n", " Q = np.diag([10., 10., 1., 1.])\n", " R = [1]\n", "\n", " return LinearQuadraticRegulator(plant, context,\n", " Q, R, input_port_index=int(plant.get_actuation_input_port().get_index()))\n", "\n", "\n", "@TemplateSystem.define(\"KeypointSystem_\")\n", "def KeypointSystem_(T):\n", "\n", " class Impl(LeafSystem_[T]):\n", "\n", " def _construct(self, plant, converter=None):\n", " LeafSystem_[T].__init__(self, converter)\n", "\n", " # Kc are the cart keypoints\n", " self.p_CKc = np.array([[.3, 0, .4], \n", " [.3, 0, .1], \n", " [-.3, 0, .1],\n", " [-.3, 0, .4],\n", " [-.15, 0, .025],\n", " [.15, 0, .025]]).T\n", "\n", " # Kp are the pole keypoints\n", " l = np.linspace(start=-.5, stop=0, num=3)\n", " self.p_PKp = np.hstack((\n", " np.vstack((0*l+0.01, 0*l, l)),\n", " np.vstack((0*l-0.01, 0*l, l))\n", " ))\n", " self.num_keypoints = self.p_CKc.shape[1] + self.p_PKp.shape[1]\n", "\n", " self.cart_index = int(plant.GetBodyByName(\"cart\").index())\n", " self.pole_index = int(plant.GetBodyByName(\"pole\").index())\n", "\n", " self.DeclareAbstractInputPort(\n", " \"body_poses\", AbstractValue.Make([RigidTransform_[T]()]*3))\n", " self.DeclareVectorOutputPort(\n", " \"keypoints\", \n", " BasicVector_[T](2 * self.num_keypoints),\n", " self.CalcKeypoints)\n", "\n", " # Save only for scalar copy construction\n", " self.plant_ = plant\n", "\n", " def _construct_copy(self, other, converter=None):\n", " Impl._construct(self, other.plant_, converter=converter)\n", "\n", " def CalcKeypoints(self, context, output):\n", " body_poses = self.get_input_port().Eval(context)\n", " X_WC = body_poses[self.cart_index]\n", " X_WP = body_poses[self.pole_index]\n", "\n", " p_WK = np.hstack((X_WC.multiply(self.p_CKc), X_WP.multiply(self.p_PKp)))\n", " output.SetFromVector((p_WK[[0,2],:]).reshape(-1,1))\n", "\n", " return Impl\n", "\n", "KeypointSystem = KeypointSystem_[None] # Default instantiation\n", "\n", "\n", "def cartpole_balancing_keypoints():\n", " builder = DiagramBuilder()\n", " plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.1)\n", " file_name = FindResource(\"models/cartpole.urdf\")\n", " Parser(plant).AddModelFromFile(file_name)\n", " plant.Finalize()\n", "\n", " # Add the LQR controller.\n", " context = plant.CreateDefaultContext()\n", " controller = builder.AddSystem(BalancingLQR(plant, context))\n", " controller.set_name(\"LQR controller\")\n", " builder.Connect(plant.get_state_output_port(), controller.get_input_port(0))\n", "\n", " # Add an iid Gaussian input for identification\n", " excitation = builder.AddSystem(RandomSource(RandomDistribution.kGaussian, 1, 0.1))\n", " excitation.set_name(\"excitation\")\n", " adder = builder.AddSystem(Adder(2, 1))\n", " gain1 = builder.AddSystem(Gain(.025, 1))\n", " gain2 = builder.AddSystem(Gain(100.0, 1))\n", " builder.Connect(excitation.get_output_port(), gain1.get_input_port())\n", " builder.Connect(gain1.get_output_port(), gain2.get_input_port())\n", " builder.Connect(gain2.get_output_port(), adder.get_input_port(0))\n", "# builder.Connect(excitation.get_output_port(), adder.get_input_port(0))\n", " builder.Connect(controller.get_output_port(), adder.get_input_port(1))\n", " builder.Connect(adder.get_output_port(), plant.get_actuation_input_port())\n", "\n", " # We will identify the map from this input to the sensors (lumping the LQR controller into the identified dynamics)\n", " u_log = LogOutput(gain1.get_output_port(), builder)\n", " u_log.set_name(\"input log\")\n", "\n", " # Add and log keypoint outputs\n", " keypoints = builder.AddSystem(KeypointSystem(plant))\n", " keypoints.set_name(\"keypoints\")\n", " builder.Connect(plant.get_body_poses_output_port(), keypoints.get_input_port())\n", " y_log = LogOutput(keypoints.get_output_port(), builder)\n", " y_log.set_name(\"output log\")\n", "\n", " diagram = builder.Build()\n", "\n", " display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))\n", " \n", " # Set up a simulator to run this diagram\n", " simulator = Simulator(diagram)\n", " context = simulator.get_mutable_context()\n", "\n", " simulator.get_mutable_integrator().set_fixed_step_mode(True)\n", " simulator.get_mutable_integrator().set_maximum_step_size(0.1)\n", "\n", " data = []\n", " context.SetTime(0.)\n", " plant.SetPositionsAndVelocities(plant.GetMyContextFromRoot(context), UprightState())\n", " simulator.Initialize()\n", " y0 = keypoints.get_output_port().Eval(keypoints.GetMyContextFromRoot(context))\n", " u0 = 0\n", "\n", " # Simulate\n", " duration = 15.0 if running_as_notebook else .5 # sets a shorter duration during testing\n", " for i in range(25):\n", " u_log.reset()\n", " y_log.reset()\n", " context.SetTime(0.)\n", " plant.SetPositionsAndVelocities(\n", " plant.GetMyContextFromRoot(context), \n", " UprightState() + 0.1 * np.random.randn(4,)\n", " )\n", " simulator.Initialize()\n", " simulator.AdvanceTo(duration)\n", "\n", " y_out = np.copy(y_log.data())\n", " for i in range(y_out.shape[0]):\n", " y_out[i,:] -= y0[i]\n", " \n", " data.append({'t': np.copy(y_log.sample_times()),\n", " 'u': np.copy(u_log.data()),\n", " 'y': y_out})\n", "\n", " return data, u0, y0\n", "\n", "data, u0, y0 = cartpole_balancing_keypoints()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here's a simple animation of the keypoints, so we understand what we're asking the algorithm to do. (Rendering the actual animation to jshtml is surprisingly slow...)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "import matplotlib.animation as animation\n", "from IPython.display import display, HTML\n", "plt.rcParams.update({\"savefig.transparent\": True})\n", "\n", "def draw_keypoints(data, y0, num_trials=-1):\n", " fig, ax = plt.subplots(figsize=(8,3))\n", " ax.axis('equal')\n", " ax.set(xlim=(-2, 2), ylim=(-.5, 1.0))\n", " frames = []\n", " for trial in data[:num_trials]:\n", " for n in range(trial['t'].size):\n", " y = (trial['y'][:, n] + y0).reshape(2,-1)\n", " frames.append(ax.plot(y[0,:], y[1,:], 'b.'))\n", " \n", " ani = animation.ArtistAnimation(fig, frames, interval=100, blit=True, repeat=True)\n", " plt.close('all')\n", " display(HTML(ani.to_jshtml()))\n", "\n", "draw_keypoints(data, y0, num_trials=3)\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's plot the $y$ data, to make sure it seems reasonable..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "fig,ax = plt.subplots()\n", "for trial in data:\n", " ax.plot(trial['y'].T)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's implement the actual Ho-Kalman algorithm" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "from pydrake.all import MultibodyPlant, Linearize\n", "\n", "def EstimateG(data, N):\n", " # N=0 fits y[k] = Du[k]\n", " # N=1 fits y[k] = CBu[k-1] + Du[k]\n", " # ...\n", " nu = data[0]['u'].shape[0]\n", " ny = data[0]['y'].shape[0]\n", " num_data = np.sum([trial['t'].size - N for trial in data])\n", " U = np.ones((nu*(N+1), num_data)) # U[:,i] = [u[k-N]; ...; u[k]]\n", " Y = np.zeros((ny, num_data))\n", " offset = 0\n", " for trial in data:\n", " nt = trial['t'].size\n", " this_u = trial['u']\n", " this_y = trial['y']\n", " for n in range(nt - N):\n", " # y[k] depends on u[k-N] to u[k]\n", " k = n+N\n", " U[:, n+offset] = this_u[:, n:k+1].reshape((-1,),order='F')\n", " Y[:, n+offset] = this_y[:, k]\n", " offset += nt - N\n", " # Least squares: Y = G*U => Y.T = U.T*G.T => a=U.T, b=Y.T, x=G.T\n", " GhatT, residuals, rank, s = np.linalg.lstsq(a=U.T, b=Y.T, rcond=None)\n", " print(f'Least squares rank (out of possible {N+1}): {rank}')\n", " return GhatT.T\n", "\n", "N = 100 if running_as_notebook else 2\n", "Ghat = EstimateG(data, N)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's compare our estimatation, $\\hat{\\bf G}$ to the ${\\bf G}$ that we get from linearizing the plant (with autodiff), and computing the true impulse response. I've plotted the results for two of the keypoints. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def linearize_cartpole():\n", " builder = DiagramBuilder()\n", "\n", " # Note the discrete time here matches the sample time above.\n", " plant = builder.AddSystem(MultibodyPlant(time_step=0.1))\n", " file_name = FindResource(\"models/cartpole.urdf\")\n", " Parser(plant).AddModelFromFile(file_name)\n", " plant.Finalize()\n", "\n", " controller = builder.AddSystem(BalancingLQR(plant, plant.CreateDefaultContext()))\n", " builder.Connect(plant.get_state_output_port(), controller.get_input_port(0))\n", " adder = builder.AddSystem(Adder(2, 1))\n", " gain = builder.AddSystem(Gain(100.0, 1))\n", " builder.Connect(gain.get_output_port(), adder.get_input_port(0))\n", " builder.Connect(controller.get_output_port(), adder.get_input_port(1))\n", " builder.Connect(adder.get_output_port(), plant.get_actuation_input_port())\n", "\n", " keypoints = builder.AddSystem(KeypointSystem(plant))\n", " builder.Connect(plant.get_body_poses_output_port(), keypoints.get_input_port())\n", "\n", " builder.ExportInput(gain.get_input_port())\n", " builder.ExportOutput(keypoints.get_output_port())\n", " diagram = builder.Build()\n", "\n", " context = diagram.CreateDefaultContext()\n", " plant_context = plant.GetMyContextFromRoot(context)\n", "\n", " # Set u0 = 0\n", " diagram.get_input_port().FixValue(context, [0])\n", " # Set x0 = UprightState()\n", " plant.SetPositionsAndVelocities(plant_context, UprightState())\n", "\n", " linear_sys = Linearize(diagram, context)\n", " return linear_sys.A(), linear_sys.B(), linear_sys.C(), linear_sys.D()\n", "\n", "def BuildG(A, B, C, D, nt):\n", " ny, nu = D.shape\n", " nx = A.shape[0]\n", " G = np.zeros((ny, (nt+1)*nu))\n", " G[:, -nu] = D.reshape(-1,)\n", " AkB = B\n", " for k in range(nt):\n", " G[:, -(k+2)*nu:-(k+1)*nu] = C @ AkB\n", " AkB = A @ AkB \n", " return G\n", "\n", "\n", "A, B, C, D = linearize_cartpole()\n", "G = BuildG(A,B,C,D,N)\n", "\n", "keypoint_inds = [1, 9]\n", "fig, ax = plt.subplots(1,len(keypoint_inds))\n", "for i in range(len(keypoint_inds)):\n", " ax[i].plot(np.flip(Ghat[keypoint_inds[i],:]),label='estimated')\n", " ax[i].plot(np.flip(G[keypoint_inds[i],:]),label='actual')\n", " ax[i].set_title(f'keypoint {keypoint_inds[i]} impulse response')\n", " ax[i].legend()\n", "\n", "#display(Latex(f'\\\\begin{{align*}} G =& {ToLatex(G[keypoint_inds,:])} \\\\\\\\ \\\\hat{{G}} =& {ToLatex(Ghat[keypoint_inds,:])} \\\\end{{align*}}'))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's extract the estimated ${\\bf \\hat{A}, \\hat{B}, \\hat{C}, \\hat{D}}$ matrices" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "def HoKalman(G, nu, nx, m, n):\n", " # Ho-Kalman algorithm, aka Eigensystem Realization Algorithm (ERA)\n", " # G are the Markov parameters [CA^{N-1}B, ..., CB, D]\n", " # nu is the number of inputs\n", " # nx is the desired number of states in the realization\n", " # m,n are the number of (block) rows/columns in the Hankel matrix\n", " # with m*nu >= n*ny and m+n <= N\n", " \n", " ny = G.shape[0]\n", " \n", " # Construct the block Hankel data matrix\n", " H = np.zeros((m*ny, n*nu))\n", " H2 = np.zeros((m*ny, n*nu))\n", " for i in range(m):\n", " Hrows = i*ny + np.arange(ny)\n", " for j in range(n):\n", " Hcols = j*nu + np.arange(nu)\n", " Gcols = -((j+i+2)*nu) + np.arange(nu)\n", " H[np.ix_(Hrows, Hcols)] = G[:, Gcols]\n", " Gcols2 = -((j+i+3)*nu) + np.arange(nu)\n", " H2[np.ix_(Hrows, Hcols)] = G[:, Gcols2]\n", "\n", " U, s, VT = np.linalg.svd(H, full_matrices=False)\n", " sxhalf = np.sqrt(s[:nx])\n", " Ux = U[:,:nx]\n", " VxT = VT[:nx, :]\n", " B = np.diag(sxhalf) @ VxT[:,:nu]\n", " C = Ux[:ny,:] @ np.diag(sxhalf)\n", " Sminushalf = np.diag(1/sxhalf)\n", " A = Sminushalf @ Ux.T @ H2 @ VxT.T @ Sminushalf\n", " D = G[:,-nu + np.arange(nu)]\n", " \n", " return A, B, C, D, s[:nx]\n", " \n", "m = round(N/2)\n", "Ahat, Bhat, Chat, Dhat, sigma = HoKalman(Ghat, nu=1, nx=10, m=m, n=N-m)\n", "fig, ax = plt.subplots(figsize=(6,2))\n", "ax.plot(sigma,'.')\n", "plt.yscale(\"log\")\n", "ax.set_title(\"Hankel singular values (log scale)\")\n", " \n", "keypoint_inds = range(5,11)\n", "display(Latex(f'\\\\begin{{align*}} \\hat{{A}} =& {ToLatex(Ahat)} & \\hat{{B}} =& {ToLatex(Bhat)} \\\\\\\\ \\hat{{C}} =& {ToLatex(Chat[keypoint_inds,:])} & \\hat{{D}} =& {ToLatex(Dhat[keypoint_inds,:])} \\\\end{{align*}}'))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It's interesting that the magnitude of the Hankel singular values drop sharply after the first 4 when we use the true ${\\bf G}$ obtained from linearization. $\\hat{\\bf G}$ doesn't have noise, but it was generated by simulating the *nonlinear* dynamics, so it has a somewhat similar effect." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "_, _, _, _, sigma = HoKalman(G, nu=1, nx=10, m=m, n=N-m) # I've used G here instead of Ghat\n", "fig, ax = plt.subplots(figsize=(6,2))\n", "ax.plot(sigma,'.')\n", "plt.yscale(\"log\")\n", "ax.set_title(\"Hankel singular values (log scale)\");" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Remember that ${\\bf \\hat{A}, \\hat{B}, \\hat{C}, \\hat{D}}$ will only approximate the original ${\\bf A, B, C, D}$ up to a similarity transform. But they should describe the same input-output behavior." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Ghat_reconstructed = BuildG(Ahat,Bhat,Chat,Dhat,N)\n", "\n", "keypoint_inds = [1, 9]\n", "fig, ax = plt.subplots(1,len(keypoint_inds))\n", "for i in range(len(keypoint_inds)):\n", " ax[i].plot(np.flip(Ghat[keypoint_inds[i],:]),label='estimated')\n", " ax[i].plot(np.flip(G[keypoint_inds[i],:]),label='actual')\n", " ax[i].plot(np.flip(Ghat_reconstructed[keypoint_inds[i],:]),label='reconstructed')\n", " ax[i].set_title(f'keypoint {keypoint_inds[i]} impulse response')\n", " ax[i].legend()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.4" }, "metadata": { "interpreter": { "hash": "bbee7842ce8ba476870a006d5d5b68f11cea175afb0fea017b7f81beccf88892" } } }, "nbformat": 4, "nbformat_minor": 4 }