{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "[Table of Contents](http://nbviewer.ipython.org/github/rlabbe/Kalman-and-Bayesian-Filters-in-Python/blob/master/table_of_contents.ipynb)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# FilterPy Source" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For your convienence I have loaded several of FilterPy's core algorithms into this appendix. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## KalmanFilter" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/kalman/kalman_filter.py\n", "\n", "\"\"\"Copyright 2014 Roger R Labbe Jr.\n", "\n", "filterpy library.\n", "http://github.com/rlabbe/filterpy\n", "\n", "Documentation at:\n", "https://filterpy.readthedocs.org\n", "\n", "Supporting book at:\n", "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", "\n", "This is licensed under an MIT license. See the readme.MD file\n", "for more information.\n", "\"\"\"\n", "\n", "from __future__ import (absolute_import, division, print_function,\n", " unicode_literals)\n", "import numpy as np\n", "import scipy.linalg as linalg\n", "from numpy import dot, zeros, eye, isscalar\n", "from filterpy.common import setter, setter_1d, setter_scalar, dot3\n", "\n", "\n", "\n", "class KalmanFilter(object):\n", " \"\"\" Implements a Kalman filter. You are responsible for setting the\n", " various state variables to reasonable values; the defaults will\n", " not give you a functional filter.\n", "\n", " You will have to set the following attributes after constructing this\n", " object for the filter to perform properly. Please note that there are\n", " various checks in place to ensure that you have made everything the\n", " 'correct' size. However, it is possible to provide incorrectly sized\n", " arrays such that the linear algebra can not perform an operation.\n", " It can also fail silently - you can end up with matrices of a size that\n", " allows the linear algebra to work, but are the wrong shape for the problem\n", " you are trying to solve.\n", "\n", " **Attributes**\n", "\n", " x : numpy.array(dim_x, 1)\n", " state estimate vector\n", "\n", " P : numpy.array(dim_x, dim_x)\n", " covariance estimate matrix\n", "\n", " R : numpy.array(dim_z, dim_z)\n", " measurement noise matrix\n", "\n", " Q : numpy.array(dim_x, dim_x)\n", " process noise matrix\n", "\n", " F : numpy.array()\n", " State Transition matrix\n", "\n", " H : numpy.array(dim_x, dim_x)\n", "\n", "\n", " You may read the following attributes.\n", "\n", " **Readable Attributes**\n", "\n", " y : numpy.array\n", " Residual of the update step.\n", "\n", " K : numpy.array(dim_x, dim_z)\n", " Kalman gain of the update step\n", "\n", " S : numpy.array\n", " Systen uncertaintly projected to measurement space\n", "\n", " \"\"\"\n", "\n", "\n", "\n", " def __init__(self, dim_x, dim_z, dim_u=0):\n", " \"\"\" Create a Kalman filter. You are responsible for setting the\n", " various state variables to reasonable values; the defaults below will\n", " not give you a functional filter.\n", "\n", " **Parameters**\n", "\n", " dim_x : int\n", " Number of state variables for the Kalman filter. For example, if\n", " you are tracking the position and velocity of an object in two\n", " dimensions, dim_x would be 4.\n", "\n", " This is used to set the default size of P, Q, and u\n", "\n", " dim_z : int\n", " Number of of measurement inputs. For example, if the sensor\n", " provides you with position in (x,y), dim_z would be 2.\n", "\n", " dim_u : int (optional)\n", " size of the control input, if it is being used.\n", " Default value of 0 indicates it is not used.\n", " \"\"\"\n", "\n", " assert dim_x > 0\n", " assert dim_z > 0\n", " assert dim_u >= 0\n", "\n", " self.dim_x = dim_x\n", " self.dim_z = dim_z\n", " self.dim_u = dim_u\n", "\n", " self._x = zeros((dim_x,1)) # state\n", " self._P = eye(dim_x) # uncertainty covariance\n", " self._Q = eye(dim_x) # process uncertainty\n", " self._B = 0 # control transition matrix\n", " self._F = 0 # state transition matrix\n", " self.H = 0 # Measurement function\n", " self.R = eye(dim_z) # state uncertainty\n", " self._alpha_sq = 1. # fading memory control\n", "\n", " # gain and residual are computed during the innovation step. We\n", " # save them so that in case you want to inspect them for various\n", " # purposes\n", " self._K = 0 # kalman gain\n", " self._y = zeros((dim_z, 1))\n", " self._S = 0 # system uncertainty in measurement space\n", "\n", " # identity matrix. Do not alter this.\n", " self._I = np.eye(dim_x)\n", "\n", "\n", " def update(self, z, R=None, H=None):\n", " \"\"\"\n", " Add a new measurement (z) to the kalman filter. If z is None, nothing\n", " is changed.\n", "\n", " **Parameters**\n", "\n", " z : np.array\n", " measurement for this update.\n", "\n", " R : np.array, scalar, or None\n", " Optionally provide R to override the measurement noise for this\n", " one call, otherwise self.R will be used.\n", " \"\"\"\n", "\n", " if z is None:\n", " return\n", "\n", " if R is None:\n", " R = self.R\n", " elif isscalar(R):\n", " R = eye(self.dim_z) * R\n", "\n", " # rename for readability and a tiny extra bit of speed\n", " if H is None:\n", " H = self.H\n", " P = self._P\n", " x = self._x\n", "\n", " # y = z - Hx\n", " # error (residual) between measurement and prediction\n", " self._y = z - dot(H, x)\n", "\n", " # S = HPH' + R\n", " # project system uncertainty into measurement space\n", " S = dot3(H, P, H.T) + R\n", "\n", " # K = PH'inv(S)\n", " # map system uncertainty into kalman gain\n", " K = dot3(P, H.T, linalg.inv(S))\n", "\n", " # x = x + Ky\n", " # predict new x with residual scaled by the kalman gain\n", " self._x = x + dot(K, self._y)\n", "\n", " # P = (I-KH)P(I-KH)' + KRK'\n", " I_KH = self._I - dot(K, H)\n", " self._P = dot3(I_KH, P, I_KH.T) + dot3(K, R, K.T)\n", "\n", " self._S = S\n", " self._K = K\n", "\n", "\n", " def test_matrix_dimensions(self):\n", " \"\"\" Performs a series of asserts to check that the size of everything\n", " is what it should be. This can help you debug problems in your design.\n", "\n", " This is only a test; you do not need to use it while filtering.\n", " However, to use you will want to perform at least one predict() and\n", " one update() before calling; some bad designs will cause the shapes\n", " of x and P to change in a silent and bad way. For example, if you\n", " pass in a badly dimensioned z into update that can cause x to be\n", " misshapen.\"\"\"\n", "\n", " assert self._x.shape == (self.dim_x, 1), \\\n", " \"Shape of x must be ({},{}), but is {}\".format(\n", " self.dim_x, 1, self._x.shape)\n", "\n", " assert self._P.shape == (self.dim_x, self.dim_x), \\\n", " \"Shape of P must be ({},{}), but is {}\".format(\n", " self.dim_x, self.dim_x, self._P.shape)\n", "\n", " assert self._Q.shape == (self.dim_x, self.dim_x), \\\n", " \"Shape of P must be ({},{}), but is {}\".format(\n", " self.dim_x, self.dim_x, self._P.shape)\n", "\n", "\n", " def predict(self, u=0):\n", " \"\"\" Predict next position.\n", "\n", " **Parameters**\n", "\n", " u : np.array\n", " Optional control vector. If non-zero, it is multiplied by B\n", " to create the control input into the system.\n", " \"\"\"\n", "\n", " # x = Fx + Bu\n", " self._x = dot(self._F, self.x) + dot(self._B, u)\n", "\n", " # P = FPF' + Q\n", " self._P = self._alpha_sq * dot3(self._F, self._P, self._F.T) + self._Q\n", "\n", "\n", " def batch_filter(self, zs, Rs=None, update_first=False):\n", " \"\"\" Batch processes a sequences of measurements.\n", "\n", " **Parameters**\n", "\n", " zs : list-like\n", " list of measurements at each time step `self.dt` Missing\n", " measurements must be represented by 'None'.\n", "\n", " Rs : list-like, optional\n", " optional list of values to use for the measurement error\n", " covariance; a value of None in any position will cause the filter\n", " to use `self.R` for that time step.\n", "\n", " update_first : bool, optional,\n", " controls whether the order of operations is update followed by\n", " predict, or predict followed by update. Default is predict->update.\n", "\n", " **Returns**\n", "\n", "\n", " means: np.array((n,dim_x,1))\n", " array of the state for each time step after the update. Each entry\n", " is an np.array. In other words `means[k,:]` is the state at step\n", " `k`.\n", "\n", " covariance: np.array((n,dim_x,dim_x))\n", " array of the covariances for each time step after the update.\n", " In other words `covariance[k,:,:]` is the covariance at step `k`.\n", "\n", " means_predictions: np.array((n,dim_x,1))\n", " array of the state for each time step after the predictions. Each\n", " entry is an np.array. In other words `means[k,:]` is the state at\n", " step `k`.\n", "\n", " covariance_predictions: np.array((n,dim_x,dim_x))\n", " array of the covariances for each time step after the prediction.\n", " In other words `covariance[k,:,:]` is the covariance at step `k`.\n", " \"\"\"\n", "\n", " try:\n", " z = zs[0]\n", " except:\n", " assert not isscalar(zs), 'zs must be list-like'\n", "\n", " if self.dim_z == 1:\n", " assert isscalar(z) or (z.ndim==1 and len(z) == 1), \\\n", " 'zs must be a list of scalars or 1D, 1 element arrays'\n", "\n", " else:\n", " assert len(z) == self.dim_z, 'each element in zs must be a'\n", " '1D array of length {}'.format(self.dim_z)\n", "\n", " n = np.size(zs,0)\n", " if Rs is None:\n", " Rs = [None]*n\n", "\n", " # mean estimates from Kalman Filter\n", " if self.x.ndim == 1:\n", " means = zeros((n, self.dim_x))\n", " means_p = zeros((n, self.dim_x))\n", " else:\n", " means = zeros((n, self.dim_x, 1))\n", " means_p = zeros((n, self.dim_x, 1))\n", "\n", " # state covariances from Kalman Filter\n", " covariances = zeros((n, self.dim_x, self.dim_x))\n", " covariances_p = zeros((n, self.dim_x, self.dim_x))\n", "\n", " if update_first:\n", " for i, (z, r) in enumerate(zip(zs, Rs)):\n", " self.update(z, r)\n", " means[i,:] = self._x\n", " covariances[i,:,:] = self._P\n", "\n", " self.predict()\n", " means_p[i,:] = self._x\n", " covariances_p[i,:,:] = self._P\n", " else:\n", " for i, (z, r) in enumerate(zip(zs, Rs)):\n", " self.predict()\n", " means_p[i,:] = self._x\n", " covariances_p[i,:,:] = self._P\n", "\n", " self.update(z, r)\n", " means[i,:] = self._x\n", " covariances[i,:,:] = self._P\n", "\n", " return (means, covariances, means_p, covariances_p)\n", "\n", "\n", "\n", " def rts_smoother(self, Xs, Ps, Qs=None):\n", " \"\"\" Runs the Rauch-Tung-Striebal Kalman smoother on a set of\n", " means and covariances computed by a Kalman filter. The usual input\n", " would come from the output of `KalmanFilter.batch_filter()`.\n", "\n", " **Parameters**\n", "\n", " Xs : numpy.array\n", " array of the means (state variable x) of the output of a Kalman\n", " filter.\n", "\n", " Ps : numpy.array\n", " array of the covariances of the output of a kalman filter.\n", "\n", " Q : list-like collection of numpy.array, optional\n", " Process noise of the Kalman filter at each time step. Optional,\n", " if not provided the filter's self.Q will be used\n", "\n", "\n", " **Returns**\n", "\n", " 'x' : numpy.ndarray\n", " smoothed means\n", "\n", " 'P' : numpy.ndarray\n", " smoothed state covariances\n", "\n", " 'K' : numpy.ndarray\n", " smoother gain at each step\n", "\n", "\n", " **Example**::\n", "\n", " zs = [t + random.randn()*4 for t in range (40)]\n", "\n", " (mu, cov, _, _) = kalman.batch_filter(zs)\n", " (x, P, K) = rts_smoother(mu, cov, fk.F, fk.Q)\n", "\n", " \"\"\"\n", "\n", " assert len(Xs) == len(Ps)\n", " shape = Xs.shape\n", " n = shape[0]\n", " dim_x = shape[1]\n", "\n", " F = self._F\n", " if not Qs:\n", " Qs = [self.Q] * n\n", "\n", " # smoother gain\n", " K = zeros((n,dim_x,dim_x))\n", "\n", " x, P = Xs.copy(), Ps.copy()\n", "\n", " for k in range(n-2,-1,-1):\n", " P_pred = dot3(F, P[k], F.T) + Qs[k]\n", "\n", " K[k] = dot3(P[k], F.T, linalg.inv(P_pred))\n", " x[k] += dot (K[k], x[k+1] - dot(F, x[k]))\n", " P[k] += dot3 (K[k], P[k+1] - P_pred, K[k].T)\n", "\n", " return (x, P, K)\n", "\n", "\n", " def get_prediction(self, u=0):\n", " \"\"\" Predicts the next state of the filter and returns it. Does not\n", " alter the state of the filter.\n", "\n", " **Parameters**\n", "\n", " u : np.array\n", " optional control input\n", "\n", " **Returns**\n", "\n", " (x, P)\n", " State vector and covariance array of the prediction.\n", " \"\"\"\n", "\n", " x = dot(self._F, self._x) + dot(self._B, u)\n", " P = self._alpha_sq * dot3(self._F, self._P, self._F.T) + self._Q\n", " return (x, P)\n", "\n", "\n", " def residual_of(self, z):\n", " \"\"\" returns the residual for the given measurement (z). Does not alter\n", " the state of the filter.\n", " \"\"\"\n", " return z - dot(self.H, self._x)\n", "\n", "\n", " def measurement_of_state(self, x):\n", " \"\"\" Helper function that converts a state into a measurement.\n", "\n", " **Parameters**\n", "\n", " x : np.array\n", " kalman state vector\n", "\n", " **Returns**\n", "\n", " z : np.array\n", " measurement corresponding to the given state\n", " \"\"\"\n", "\n", " return dot(self.H, x)\n", "\n", "\n", " @property\n", " def alpha(self):\n", " \"\"\" Fading memory setting. 1.0 gives the normal Kalman filter, and\n", " values slightly larger than 1.0 (such as 1.02) give a fading\n", " memory effect - previous measurements have less influence on the\n", " filter's estimates. This formulation of the Fading memory filter\n", " (there are many) is due to Dan Simon [1].\n", "\n", " ** References **\n", "\n", " [1] Dan Simon. \"Optimal State Estimation.\" John Wiley & Sons. \n", " p. 208-212. (2006)\n", " \"\"\"\n", "\n", " return self._alpha_sq**.5\n", "\n", "\n", " @alpha.setter\n", " def alpha(self, value):\n", " assert np.isscalar(value)\n", " assert value > 0\n", "\n", " self._alpha_sq = value**2\n", "\n", " @property\n", " def Q(self):\n", " \"\"\" Process uncertainty\"\"\"\n", " return self._Q\n", "\n", "\n", " @Q.setter\n", " def Q(self, value):\n", " self._Q = setter_scalar(value, self.dim_x)\n", "\n", " @property\n", " def P(self):\n", " \"\"\" covariance matrix\"\"\"\n", " return self._P\n", "\n", "\n", " @P.setter\n", " def P(self, value):\n", " self._P = setter_scalar(value, self.dim_x)\n", "\n", "\n", " @property\n", " def F(self):\n", " \"\"\" state transition matrix\"\"\"\n", " return self._F\n", "\n", "\n", " @F.setter\n", " def F(self, value):\n", " self._F = setter(value, self.dim_x, self.dim_x)\n", "\n", " @property\n", " def B(self):\n", " \"\"\" control transition matrix\"\"\"\n", " return self._B\n", "\n", "\n", " @B.setter\n", " def B(self, value):\n", " \"\"\" control transition matrix\"\"\"\n", " self._B = setter (value, self.dim_x, self.dim_u)\n", "\n", "\n", " @property\n", " def x(self):\n", " \"\"\" filter state vector.\"\"\"\n", " return self._x\n", "\n", "\n", " @x.setter\n", " def x(self, value):\n", " self._x = setter_1d(value, self.dim_x)\n", "\n", "\n", " @property\n", " def K(self):\n", " \"\"\" Kalman gain \"\"\"\n", " return self._K\n", "\n", "\n", " @property\n", " def y(self):\n", " \"\"\" measurement residual (innovation) \"\"\"\n", " return self._y\n", "\n", " @property\n", " def S(self):\n", " \"\"\" system uncertainty in measurement space \"\"\"\n", " return self._S\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ExtendedKalmanFilter" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/kalman/EKF.py\n", "\n", "\"\"\"Copyright 2014 Roger R Labbe Jr.\n", "\n", "filterpy library.\n", "http://github.com/rlabbe/filterpy\n", "\n", "Documentation at:\n", "https://filterpy.readthedocs.org\n", "\n", "Supporting book at:\n", "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", "\n", "This is licensed under an MIT license. See the readme.MD file\n", "for more information.\n", "\"\"\"\n", "\n", "from __future__ import (absolute_import, division, print_function,\n", " unicode_literals)\n", "import numpy as np\n", "import scipy.linalg as linalg\n", "from numpy import dot, zeros, eye\n", "from filterpy.common import setter, setter_1d, setter_scalar, dot3\n", "\n", "\n", "class ExtendedKalmanFilter(object):\n", "\n", " def __init__(self, dim_x, dim_z, dim_u=0):\n", " \"\"\" Extended Kalman filter. You are responsible for setting the\n", " various state variables to reasonable values; the defaults below will\n", " not give you a functional filter.\n", "\n", " **Parameters**\n", "\n", " dim_x : int\n", " Number of state variables for the Kalman filter. For example, if\n", " you are tracking the position and velocity of an object in two\n", " dimensions, dim_x would be 4.\n", "\n", " This is used to set the default size of P, Q, and u\n", "\n", " dim_z : int\n", " Number of of measurement inputs. For example, if the sensor\n", " provides you with position in (x,y), dim_z would be 2.\n", " \"\"\"\n", "\n", " self.dim_x = dim_x\n", " self.dim_z = dim_z\n", "\n", " self._x = zeros((dim_x,1)) # state\n", " self._P = eye(dim_x) # uncertainty covariance\n", " self._B = 0 # control transition matrix\n", " self._F = 0 # state transition matrix\n", " self._R = eye(dim_z) # state uncertainty\n", " self._Q = eye(dim_x) # process uncertainty\n", " self._y = zeros((dim_z, 1))\n", "\n", " # identity matrix. Do not alter this.\n", " self._I = np.eye(dim_x)\n", "\n", "\n", " def predict_update(self, z, HJacobian, Hx, u=0):\n", " \"\"\" Performs the predict/update innovation of the extended Kalman\n", " filter.\n", "\n", " **Parameters**\n", "\n", " z : np.array\n", " measurement for this step.\n", " If `None`, only predict step is perfomed.\n", "\n", " HJacobian : function\n", " function which computes the Jacobian of the H matrix (measurement\n", " function). Takes state variable (self.x) as input, returns H.\n", "\n", "\n", " Hx : function\n", " function which takes a state variable and returns the measurement\n", " that would correspond to that state.\n", "\n", " u : np.array or scalar\n", " optional control vector input to the filter.\n", " \"\"\"\n", " if np.isscalar(z) and self.dim_z == 1:\n", " z = np.asarray([z], float)\n", "\n", " F = self._F\n", " B = self._B\n", " P = self._P\n", " Q = self._Q\n", " R = self._R\n", " x = self._x\n", "\n", " H = HJacobian(x)\n", "\n", " # predict step\n", " x = dot(F, x) + dot(B, u)\n", " P = dot3(F, P, F.T) + Q\n", "\n", " # update step\n", " S = dot3(H, P, H.T) + R\n", " K = dot3(P, H.T, linalg.inv (S))\n", "\n", " self._x = x + dot(K, (z - Hx(x)))\n", "\n", " I_KH = self._I - dot(K, H)\n", " self._P = dot3(I_KH, P, I_KH.T) + dot3(K, R, K.T)\n", "\n", "\n", " def update(self, z, HJacobian, Hx, R=None):\n", " \"\"\" Performs the update innovation of the extended Kalman filter.\n", "\n", " **Parameters**\n", "\n", " z : np.array\n", " measurement for this step.\n", " If `None`, only predict step is perfomed.\n", "\n", " HJacobian : function\n", " function which computes the Jacobian of the H matrix (measurement\n", " function). Takes state variable (self.x) as input, returns H.\n", "\n", "\n", " Hx : function\n", " function which takes a state variable and returns the measurement\n", " that would correspond to that state.\n", " \"\"\"\n", "\n", " P = self._P\n", " if R is None:\n", " R = self._R\n", " elif np.isscalar(R):\n", " R = eye(self.dim_z) * R\n", "\n", " if np.isscalar(z) and self.dim_z == 1:\n", " z = np.asarray([z], float)\n", "\n", " x = self._x\n", "\n", " H = HJacobian(x)\n", "\n", " S = dot3(H, P, H.T) + R\n", " K = dot3(P, H.T, linalg.inv (S))\n", "\n", " y = z - Hx(x)\n", " self._x = x + dot(K, y)\n", "\n", " I_KH = self._I - dot(K, H)\n", " self._P = dot3(I_KH, P, I_KH.T) + dot3(K, R, K.T)\n", "\n", "\n", " def predict_x(self, u=0):\n", " \"\"\" predicts the next state of X. If you need to\n", " compute the next state yourself, override this function. You would\n", " need to do this, for example, if the usual Taylor expansion to\n", " generate F is not providing accurate results for you. \"\"\"\n", "\n", " self._x = dot(self._F, self._x) + dot(self._B, u)\n", "\n", "\n", " def predict(self, u=0):\n", " \"\"\" Predict next position.\n", "\n", " **Parameters**\n", "\n", " u : np.array\n", " Optional control vector. If non-zero, it is multiplied by B\n", " to create the control input into the system.\n", " \"\"\"\n", "\n", " self.predict_x(u)\n", " self._P = dot3(self._F, self._P, self._F.T) + self._Q\n", "\n", "\n", " @property\n", " def Q(self):\n", " \"\"\" Process uncertainty\"\"\"\n", " return self._Q\n", "\n", "\n", " @Q.setter\n", " def Q(self, value):\n", " self._Q = setter_scalar(value, self.dim_x)\n", "\n", "\n", " @property\n", " def P(self):\n", " \"\"\" covariance matrix\"\"\"\n", " return self._P\n", "\n", "\n", " @P.setter\n", " def P(self, value):\n", " self._P = setter_scalar(value, self.dim_x)\n", "\n", "\n", " @property\n", " def R(self):\n", " \"\"\" measurement uncertainty\"\"\"\n", " return self._R\n", "\n", "\n", " @R.setter\n", " def R(self, value):\n", " self._R = setter_scalar(value, self.dim_z)\n", "\n", "\n", " @property\n", " def F(self):\n", " return self._F\n", "\n", "\n", " @F.setter\n", " def F(self, value):\n", " self._F = setter(value, self.dim_x, self.dim_x)\n", "\n", "\n", " @property\n", " def B(self):\n", " return self._B\n", "\n", "\n", " @B.setter\n", " def B(self, value):\n", " \"\"\" control transition matrix\"\"\"\n", " self._B = setter(value, self.dim_x, self.dim_u)\n", "\n", "\n", " @property\n", " def x(self):\n", " return self._x\n", "\n", " @x.setter\n", " def x(self, value):\n", " self._x = setter_1d(value, self.dim_x)\n", "\n", " @property\n", " def K(self):\n", " \"\"\" Kalman gain \"\"\"\n", " return self._K\n", "\n", " @property\n", " def y(self):\n", " \"\"\" measurement residual (innovation) \"\"\"\n", " return self._y\n", "\n", " @property\n", " def S(self):\n", " \"\"\" system uncertainty in measurement space \"\"\"\n", " return self._S\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## UnscentedKalmanFilter" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/kalman/UKF.py\n", "\"\"\"Copyright 2014 Roger R Labbe Jr.\n", "\n", "filterpy library.\n", "http://github.com/rlabbe/filterpy\n", "\n", "Documentation at:\n", "https://filterpy.readthedocs.org\n", "\n", "Supporting book at:\n", "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", "\n", "This is licensed under an MIT license. See the readme.MD file\n", "for more information.\n", "\"\"\"\n", "\n", "# pylint bug - warns about numpy functions which do in fact exist.\n", "# pylint: disable=E1101\n", "\n", "\n", "#I like aligning equal signs for readability of math\n", "# pylint: disable=C0326\n", "\n", "from __future__ import (absolute_import, division, print_function,\n", " unicode_literals)\n", "\n", "from numpy.linalg import inv, cholesky\n", "import numpy as np\n", "from numpy import asarray, eye, zeros, dot, isscalar, outer\n", "from filterpy.common import dot3\n", "\n", "\n", "class UnscentedKalmanFilter(object):\n", " # pylint: disable=too-many-instance-attributes\n", " # pylint: disable=C0103\n", " \"\"\" Implements the Unscented Kalman filter (UKF) as defined by Simon J.\n", " Julier and Jeffery K. Uhlmann [1]. Succintly, the UKF selects a set of\n", " sigma points and weights inside the covariance matrix of the filter's\n", " state. These points are transformed through the nonlinear process being\n", " filtered, and are rebuilt into a mean and covariance by computed the\n", " weighted mean and expected value of the transformed points. Read the paper;\n", " it is excellent. My book \"Kalman and Bayesian Filters in Python\" [2]\n", " explains the algorithm, develops this code, and provides examples of the\n", " filter in use.\n", "\n", "\n", " You will have to set the following attributes after constructing this\n", " object for the filter to perform properly.\n", "\n", " **Attributes**\n", "\n", " x : numpy.array(dim_x)\n", " state estimate vector\n", "\n", " P : numpy.array(dim_x, dim_x)\n", " covariance estimate matrix\n", "\n", " R : numpy.array(dim_z, dim_z)\n", " measurement noise matrix\n", "\n", " Q : numpy.array(dim_x, dim_x)\n", " process noise matrix\n", "\n", "\n", " You may read the following attributes.\n", "\n", " **Readable Attributes**\n", "\n", " Pxz : numpy.aray(dim_x, dim_z)\n", " Cross variance of x and z computed during update() call.\n", "\n", "\n", " **References**\n", "\n", " .. [1] Julier, Simon J.; Uhlmann, Jeffrey \"A New Extension of the Kalman\n", " Filter to Nonlinear Systems\". Proc. SPIE 3068, Signal Processing,\n", " Sensor Fusion, and Target Recognition VI, 182 (July 28, 1997)\n", "\n", " .. [2] Labbe, Roger R. \"Kalman and Bayesian Filters in Python\"\n", "\n", " https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", " \"\"\"\n", "\n", " def __init__(self, dim_x, dim_z, dt, hx, fx, kappa=0.):\n", " \"\"\" Create a Kalman filter. You are responsible for setting the\n", " various state variables to reasonable values; the defaults below will\n", " not give you a functional filter.\n", "\n", " **Parameters**\n", "\n", " dim_x : int\n", " Number of state variables for the filter. For example, if\n", " you are tracking the position and velocity of an object in two\n", " dimensions, dim_x would be 4.\n", "\n", "\n", " dim_z : int\n", " Number of of measurement inputs. For example, if the sensor\n", " provides you with position in (x,y), dim_z would be 2.\n", "\n", " dt : float\n", " Time between steps in seconds.\n", "\n", " hx : function(x)\n", " Measurement function. Converts state vector x into a measurement\n", " vector of shape (dim_z).\n", "\n", " fx : function(x,dt)\n", " function that returns the state x transformed by the\n", " state transistion function. dt is the time step in seconds.\n", "\n", " kappa : float, default=0.\n", " Scaling factor that can reduce high order errors. kappa=0 gives\n", " the standard unscented filter. According to [1], if you set\n", " kappa to 3-dim_x for a Gaussian x you will minimize the fourth\n", " order errors in x and P.\n", "\n", " **References**\n", "\n", " [1] S. Julier, J. Uhlmann, and H. Durrant-Whyte. \"A new method for\n", " the nonlinear transformation of means and covariances in filters\n", " and estimators,\" IEEE Transactions on Automatic Control, 45(3),\n", " pp. 477-482 (March 2000).\n", " \"\"\"\n", "\n", " self.Q = eye(dim_x)\n", " self.R = eye(dim_z)\n", " self.x = zeros(dim_x)\n", " self.P = eye(dim_x)\n", " self._dim_x = dim_x\n", " self._dim_z = dim_z\n", " self._dt = dt\n", " self._num_sigmas = 2*dim_x + 1\n", " self.kappa = kappa\n", " self.hx = hx\n", " self.fx = fx\n", "\n", " # weights for the sigma points\n", " self.W = self.weights(dim_x, kappa)\n", "\n", " # sigma points transformed through f(x) and h(x)\n", " # variables for efficiency so we don't recreate every update\n", " self.sigmas_f = zeros((self._num_sigmas, self._dim_x))\n", "\n", "\n", " def update(self, z, R=None, residual=np.subtract, UT=None):\n", " \"\"\" Update the UKF with the given measurements. On return,\n", " self.x and self.P contain the new mean and covariance of the filter.\n", "\n", " **Parameters**\n", "\n", " z : numpy.array of shape (dim_z)\n", " measurement vector\n", "\n", " R : numpy.array((dim_z, dim_z)), optional\n", " Measurement noise. If provided, overrides self.R for\n", " this function call.\n", "\n", " residual : function (z, z2), optional\n", " Optional function that computes the residual (difference) between\n", " the two measurement vectors. If you do not provide this, then the\n", " built in minus operator will be used. You will normally want to use\n", " the built in unless your residual computation is nonlinear (for\n", " example, if they are angles)\n", "\n", " UT : function(sigmas, Wm, Wc, noise_cov), optional\n", " Optional function to compute the unscented transform for the sigma\n", " points passed through hx. Typically the default function will\n", " work, but if for example you are using angles the default method\n", " of computing means and residuals will not work, and you will have\n", " to define how to compute it.\n", " \"\"\"\n", "\n", " if isscalar(z):\n", " dim_z = 1\n", " else:\n", " dim_z = len(z)\n", "\n", " if R is None:\n", " R = self.R\n", " elif np.isscalar(R):\n", " R = eye(self._dim_z) * R\n", "\n", " # rename for readability\n", " sigmas_f = self.sigmas_f\n", " sigmas_h = zeros((self._num_sigmas, dim_z))\n", "\n", " if UT is None:\n", " UT = unscented_transform\n", "\n", " # transform sigma points into measurement space\n", " for i in range(self._num_sigmas):\n", " sigmas_h[i] = self.hx(sigmas_f[i])\n", "\n", " # mean and covariance of prediction passed through inscented transform\n", " zp, Pz = UT(sigmas_h, self.W, self.W, R)\n", "\n", " # compute cross variance of the state and the measurements\n", " '''self.Pxz = zeros((self._dim_x, dim_z))\n", " for i in range(self._num_sigmas):\n", " self.Pxz += self.W[i] * np.outer(sigmas_f[i] - self.x,\n", " residual(sigmas_h[i], zp))'''\n", "\n", " # this is the unreadable but fast implementation of the\n", " # commented out loop above\n", " yh = sigmas_f - self.x[np.newaxis, :]\n", " yz = residual(sigmas_h, zp[np.newaxis, :])\n", " self.Pxz = yh.T.dot(np.diag(self.W)).dot(yz)\n", "\n", " K = dot(self.Pxz, inv(Pz)) # Kalman gain\n", " y = residual(z, zp)\n", "\n", " self.x = self.x + dot(K, y)\n", " self.P = self.P - dot3(K, Pz, K.T)\n", "\n", "\n", "\n", " def predict(self, dt=None):\n", " \"\"\" Performs the predict step of the UKF. On return, self.xp and\n", " self.Pp contain the predicted state (xp) and covariance (Pp). 'p'\n", " stands for prediction.\n", "\n", " **Parameters**\n", " dt : double, optional\n", " If specified, the time step to be used for this prediction.\n", " self._dt is used if this is not provided.\n", "\n", " Important: this MUST be called before update() is called for the\n", " first time.\n", " \"\"\"\n", "\n", " if dt is None:\n", " dt = self._dt\n", "\n", " # calculate sigma points for given mean and covariance\n", " sigmas = self.sigma_points(self.x, self.P, self.kappa)\n", "\n", " for i in range(self._num_sigmas):\n", " self.sigmas_f[i] = self.fx(sigmas[i], dt)\n", "\n", " self.x, self.P = unscented_transform(\n", " self.sigmas_f, self.W, self.W, self.Q)\n", "\n", "\n", " def batch_filter(self, zs, Rs=None, residual=np.subtract, UT=None):\n", " \"\"\" Performs the UKF filter over the list of measurement in `zs`.\n", "\n", "\n", " **Parameters**\n", "\n", " zs : list-like\n", " list of measurements at each time step `self._dt` Missing\n", " measurements must be represented by 'None'.\n", "\n", " Rs : list-like, optional\n", " optional list of values to use for the measurement error\n", " covariance; a value of None in any position will cause the filter\n", " to use `self.R` for that time step.\n", "\n", " residual : function (z, z2), optional\n", " Optional function that computes the residual (difference) between\n", " the two measurement vectors. If you do not provide this, then the\n", " built in minus operator will be used. You will normally want to use\n", " the built in unless your residual computation is nonlinear (for\n", " example, if they are angles)\n", "\n", " UT : function(sigmas, Wm, Wc, noise_cov), optional\n", " Optional function to compute the unscented transform for the sigma\n", " points passed through hx. Typically the default function will\n", " work, but if for example you are using angles the default method\n", " of computing means and residuals will not work, and you will have\n", " to define how to compute it.\n", "\n", " **Returns**\n", "\n", " means: np.array((n,dim_x,1))\n", " array of the state for each time step after the update. Each entry\n", " is an np.array. In other words `means[k,:]` is the state at step\n", " `k`.\n", "\n", " covariance: np.array((n,dim_x,dim_x))\n", " array of the covariances for each time step after the update.\n", " In other words `covariance[k,:,:]` is the covariance at step `k`.\n", " \n", " \"\"\"\n", "\n", " try:\n", " z = zs[0]\n", " except:\n", " assert not isscalar(zs), 'zs must be list-like'\n", "\n", " if self._dim_z == 1:\n", " assert isscalar(z) or (z.ndim==1 and len(z) == 1), \\\n", " 'zs must be a list of scalars or 1D, 1 element arrays'\n", "\n", " else:\n", " assert len(z) == self._dim_z, 'each element in zs must be a' \\\n", " '1D array of length {}'.format(self._dim_z)\n", "\n", " n = np.size(zs,0)\n", " if Rs is None:\n", " Rs = [None]*n\n", "\n", " # mean estimates from Kalman Filter\n", " if self.x.ndim == 1:\n", " means = zeros((n, self._dim_x))\n", " else:\n", " means = zeros((n, self._dim_x, 1))\n", "\n", " # state covariances from Kalman Filter\n", " covariances = zeros((n, self._dim_x, self._dim_x))\n", " \n", " for i, (z, r) in enumerate(zip(zs, Rs)):\n", " self.predict()\n", " self.update(z, r)\n", " means[i,:] = self.x\n", " covariances[i,:,:] = self.P\n", " \n", " return (means, covariances)\n", "\n", " \n", "\n", " def rts_smoother(self, Xs, Ps, Qs=None, dt=None):\n", " \"\"\" Runs the Rauch-Tung-Striebal Kalman smoother on a set of\n", " means and covariances computed by the UKF. The usual input\n", " would come from the output of `batch_filter()`.\n", "\n", " **Parameters**\n", "\n", " Xs : numpy.array\n", " array of the means (state variable x) of the output of a Kalman\n", " filter.\n", "\n", " Ps : numpy.array\n", " array of the covariances of the output of a kalman filter.\n", "\n", " Q : list-like collection of numpy.array, optional\n", " Process noise of the Kalman filter at each time step. Optional,\n", " if not provided the filter's self.Q will be used\n", "\n", " dt : optional, float or array-like of float\n", " If provided, specifies the time step of each step of the filter.\n", " If float, then the same time step is used for all steps. If\n", " an array, then each element k contains the time at step k.\n", " Units are seconds.\n", "\n", " **Returns**\n", "\n", " 'x' : numpy.ndarray\n", " smoothed means\n", "\n", " 'P' : numpy.ndarray\n", " smoothed state covariances\n", "\n", " 'K' : numpy.ndarray\n", " smoother gain at each step\n", "\n", "\n", " **Example**::\n", "\n", " zs = [t + random.randn()*4 for t in range (40)]\n", "\n", " (mu, cov, _, _) = kalman.batch_filter(zs)\n", " (x, P, K) = rts_smoother(mu, cov, fk.F, fk.Q)\n", "\n", " \"\"\"\n", " assert len(Xs) == len(Ps)\n", " n, dim_x = Xs.shape\n", "\n", " if dt is None:\n", " dt = [self._dt] * n\n", " elif isscalar(dt):\n", " dt = [dt] * n\n", "\n", " if Qs is None:\n", " Qs = [self.Q] * n\n", "\n", " # smoother gain\n", " Ks = zeros((n,dim_x,dim_x))\n", "\n", " num_sigmas = 2*dim_x + 1\n", "\n", " xs, ps = Xs.copy(), Ps.copy()\n", " sigmas_f = zeros((num_sigmas, dim_x))\n", "\n", "\n", " for k in range(n-2,-1,-1):\n", " # create sigma points from state estimate, pass through state func\n", " sigmas = self.sigma_points(xs[k], ps[k], self.kappa)\n", " for i in range(num_sigmas):\n", " sigmas_f[i] = self.fx(sigmas[i], dt[k])\n", "\n", " # compute backwards prior state and covariance\n", " xb = dot(self.W, sigmas_f)\n", " Pb = 0\n", " x = Xs[k]\n", " for i in range(num_sigmas):\n", " y = sigmas_f[i] - x\n", " Pb += self.W[i] * outer(y, y)\n", " Pb += Qs[k]\n", "\n", " # compute cross variance\n", " Pxb = 0\n", " for i in range(num_sigmas):\n", " z = sigmas[i] - Xs[k]\n", " y = sigmas_f[i] - xb\n", " Pxb += self.W[i] * outer(z, y)\n", "\n", " # compute gain\n", " K = dot(Pxb, inv(Pb))\n", "\n", " # update the smoothed estimates\n", " xs[k] += dot (K, xs[k+1] - xb)\n", " ps[k] += dot3(K, ps[k+1] - Pb, K.T)\n", " Ks[k] = K\n", "\n", " return (xs, ps, Ks)\n", "\n", "\n", " @staticmethod\n", " def weights(n, kappa):\n", " \"\"\" Computes the weights for an unscented Kalman filter. See\n", " __init__() for meaning of parameters.\n", " \"\"\"\n", "\n", " assert n > 0, \"n must be greater than 0, it's value is {}\".format(n)\n", "\n", " k = .5 / (n+kappa)\n", " W = np.full(2*n+1, k)\n", " W[0] = kappa / (n+kappa)\n", " return W\n", "\n", "\n", " @staticmethod\n", " def sigma_points(x, P, kappa):\n", " \"\"\" Computes the sigma points for an unscented Kalman filter\n", " given the mean (x) and covariance(P) of the filter.\n", " kappa is an arbitrary constant. Returns sigma points.\n", "\n", " Works with both scalar and array inputs:\n", " sigma_points (5, 9, 2) # mean 5, covariance 9\n", " sigma_points ([5, 2], 9*eye(2), 2) # means 5 and 2, covariance 9I\n", "\n", " **Parameters**\n", "\n", " X An array-like object of the means of length n\n", " Can be a scalar if 1D.\n", " examples: 1, [1,2], np.array([1,2])\n", "\n", " P : scalar, or np.array\n", " Covariance of the filter. If scalar, is treated as eye(n)*P.\n", "\n", " kappa : float\n", " Scaling factor.\n", "\n", " **Returns**\n", "\n", " sigmas : np.array, of size (n, 2n+1)\n", " 2D array of sigma points. Each column contains all of\n", " the sigmas for one dimension in the problem space. They\n", " are ordered as:\n", "\n", " .. math::\n", " sigmas[0] = x \\n\n", " sigmas[1..n] = x + [\\sqrt{(n+\\kappa)P}]_k \\n\n", " sigmas[n+1..2n] = x - [\\sqrt{(n+\\kappa)P}]_k\n", " \"\"\"\n", "\n", " if np.isscalar(x):\n", " x = asarray([x])\n", " n = np.size(x) # dimension of problem\n", "\n", " if np.isscalar(P):\n", " P = eye(n)*P\n", "\n", " sigmas = zeros((2*n+1, n))\n", "\n", " # implements U'*U = (n+kappa)*P. Returns lower triangular matrix.\n", " # Take transpose so we can access with U[i]\n", " U = cholesky((n+kappa)*P).T\n", " #U = sqrtm((n+kappa)*P).T\n", "\n", " sigmas[0] = x\n", " sigmas[1:n+1] = x + U\n", " sigmas[n+1:2*n+2] = x - U\n", "\n", " return sigmas\n", "\n", "\n", "def unscented_transform(Sigmas, Wm, Wc, noise_cov):\n", " \"\"\" Computes unscented transform of a set of sigma points and weights.\n", " returns the mean and covariance in a tuple.\n", " \"\"\"\n", "\n", " kmax, n = Sigmas.shape\n", "\n", " # new mean is just the sum of the sigmas * weight\n", " x = dot(Wm, Sigmas) # dot = \\Sigma^n_1 (W[k]*Xi[k])\n", "\n", " # new covariance is the sum of the outer product of the residuals\n", " # times the weights\n", " '''P = zeros((n, n))\n", " for k in range(kmax):\n", " y = Sigmas[k] - x\n", " P += Wc[k] * np.outer(y, y)'''\n", "\n", " # this is the fast way to do the commented out code above\n", " y = Sigmas - x[np.newaxis,:]\n", " P = y.T.dot(np.diag(Wc)).dot(y)\n", "\n", " if noise_cov is not None:\n", " P += noise_cov\n", "\n", " return (x, P)\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.4.3" } }, "nbformat": 4, "nbformat_minor": 0 }