{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## FastSLAM1.0" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "" ] }, "execution_count": 1, "metadata": { "image/png": { "width": 600 } }, "output_type": "execute_result" } ], "source": [ "from IPython.display import Image\n", "Image(filename=\"animation.png\",width=600)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simulation\n", "\n", "This is a feature based SLAM example using FastSLAM 1.0.\n", "\n", "The blue line is ground truth, the black line is dead reckoning, the red\n", "line is the estimated trajectory with FastSLAM.\n", "\n", "The red points are particles of FastSLAM.\n", "\n", "Black points are landmarks, blue crosses are estimated landmark\n", "positions by FastSLAM.\n", "\n", "\n", "![gif](https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/SLAM/FastSLAM1/animation.gif)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Introduction\n", "\n", "FastSLAM algorithm implementation is based on particle filters and belongs to the family of probabilistic SLAM approaches. It is used with feature-based maps (see gif above) or with occupancy grid maps.\n", "\n", "As it is shown, the particle filter differs from EKF by representing the robot's estimation through a set of particles. Each single particle has an independent belief, as it holds the pose $(x, y, \\theta)$ and an array of landmark locations $[(x_1, y_1), (x_2, y_2), ... (x_n, y_n)]$ for n landmarks.\n", "\n", "* The blue line is the true trajectory\n", "* The red line is the estimated trajectory\n", "* The red dots represent the distribution of particles\n", "* The black line represent dead reckoning tracjectory\n", "* The blue x is the observed and estimated landmarks\n", "* The black x is the true landmark\n", "\n", "I.e. Each particle maintains a deterministic pose and n-EKFs for each landmark and update it with each measurement." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Algorithm walkthrough\n", "\n", "The particles are initially drawn from a uniform distribution the represent the initial uncertainty.\n", "At each time step we do:\n", "\n", "* Predict the pose for each particle by using $u$ and the motion model (the landmarks are not updated). \n", "* Update the particles with observations $z$, where the weights are adjusted based on how likely the particle to have the correct pose given the sensor measurement\n", "* Resampling such that the particles with the largest weights survive and the unlikely ones with the lowest weights die out.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1- Predict\n", "The following equations and code snippets we can see how the particles distribution evolves in case we provide only the control $(v,w)$, which are the linear and angular velocity repsectively. \n", "\n", "$\\begin{equation*}\n", "F=\n", "\\begin{bmatrix}\n", "1 & 0 & 0 \\\\\n", "0 & 1 & 0 \\\\\n", "0 & 0 & 1 \n", "\\end{bmatrix}\n", "\\end{equation*}$\n", "\n", "$\\begin{equation*}\n", "B=\n", "\\begin{bmatrix}\n", "\\Delta t cos(\\theta) & 0\\\\\n", "\\Delta t sin(\\theta) & 0\\\\\n", "0 & \\Delta t\n", "\\end{bmatrix}\n", "\\end{equation*}$\n", "\n", "$\\begin{equation*}\n", "X = FX + BU \n", "\\end{equation*}$\n", "\n", "\n", "$\\begin{equation*}\n", "\\begin{bmatrix}\n", "x_{t+1} \\\\\n", "y_{t+1} \\\\\n", "\\theta_{t+1}\n", "\\end{bmatrix}=\n", "\\begin{bmatrix}\n", "1 & 0 & 0 \\\\\n", "0 & 1 & 0 \\\\\n", "0 & 0 & 1 \n", "\\end{bmatrix}\\begin{bmatrix}\n", "x_{t} \\\\\n", "y_{t} \\\\\n", "\\theta_{t}\n", "\\end{bmatrix}+\n", "\\begin{bmatrix}\n", "\\Delta t cos(\\theta) & 0\\\\\n", "\\Delta t sin(\\theta) & 0\\\\\n", "0 & \\Delta t\n", "\\end{bmatrix}\n", "\\begin{bmatrix}\n", "v_{t} + \\sigma_v\\\\\n", "w_{t} + \\sigma_w\\\\\n", "\\end{bmatrix}\n", "\\end{equation*}$\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following snippets playsback the recorded trajectory of each particle.\n", "\n", "To get the insight of the motion model change the value of $R$ and re-run the cells again. As R is the parameters that indicates how much we trust that the robot executed the motion commands.\n", "\n", "It is interesting to notice also that only motion will increase the uncertainty in the system as the particles start to spread out more. If observations are included the uncertainty will decrease and particles will converge to the correct estimate." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# CODE SNIPPET #\n", "import numpy as np\n", "import math\n", "from copy import deepcopy\n", "# Fast SLAM covariance\n", "Q = np.diag([3.0, np.deg2rad(10.0)])**2\n", "R = np.diag([1.0, np.deg2rad(20.0)])**2\n", "\n", "# Simulation parameter\n", "Qsim = np.diag([0.3, np.deg2rad(2.0)])**2\n", "Rsim = np.diag([0.5, np.deg2rad(10.0)])**2\n", "OFFSET_YAWRATE_NOISE = 0.01\n", "\n", "DT = 0.1 # time tick [s]\n", "SIM_TIME = 50.0 # simulation time [s]\n", "MAX_RANGE = 20.0 # maximum observation range\n", "M_DIST_TH = 2.0 # Threshold of Mahalanobis distance for data association.\n", "STATE_SIZE = 3 # State size [x,y,yaw]\n", "LM_SIZE = 2 # LM srate size [x,y]\n", "N_PARTICLE = 100 # number of particle\n", "NTH = N_PARTICLE / 1.5 # Number of particle for re-sampling\n", "\n", "class Particle:\n", "\n", " def __init__(self, N_LM):\n", " self.w = 1.0 / N_PARTICLE\n", " self.x = 0.0\n", " self.y = 0.0\n", " self.yaw = 0.0\n", " # landmark x-y positions\n", " self.lm = np.zeros((N_LM, LM_SIZE))\n", " # landmark position covariance\n", " self.lmP = np.zeros((N_LM * LM_SIZE, LM_SIZE))\n", "\n", "def motion_model(x, u):\n", " F = np.array([[1.0, 0, 0],\n", " [0, 1.0, 0],\n", " [0, 0, 1.0]])\n", "\n", " B = np.array([[DT * math.cos(x[2, 0]), 0],\n", " [DT * math.sin(x[2, 0]), 0],\n", " [0.0, DT]])\n", " x = F @ x + B @ u\n", " \n", " x[2, 0] = pi_2_pi(x[2, 0])\n", " return x\n", " \n", "def predict_particles(particles, u):\n", " for i in range(N_PARTICLE):\n", " px = np.zeros((STATE_SIZE, 1))\n", " px[0, 0] = particles[i].x\n", " px[1, 0] = particles[i].y\n", " px[2, 0] = particles[i].yaw\n", " ud = u + (np.random.randn(1, 2) @ R).T # add noise\n", " px = motion_model(px, ud)\n", " particles[i].x = px[0, 0]\n", " particles[i].y = px[1, 0]\n", " particles[i].yaw = px[2, 0]\n", "\n", " return particles\n", "\n", "def pi_2_pi(angle):\n", " return (angle + math.pi) % (2 * math.pi) - math.pi\n", "\n", "# END OF SNIPPET\n", "\n", "N_LM = 0 \n", "particles = [Particle(N_LM) for i in range(N_PARTICLE)]\n", "time= 0.0\n", "v = 1.0 # [m/s]\n", "yawrate = 0.1 # [rad/s]\n", "u = np.array([v, yawrate]).reshape(2, 1)\n", "history = []\n", "while SIM_TIME >= time:\n", " time += DT\n", " particles = predict_particles(particles, u)\n", " history.append(deepcopy(particles))\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "3f279cf1dcaf4ec886c01942c36e601d", "version_major": 2, "version_minor": 0 }, "text/plain": [ "interactive(children=(IntSlider(value=0, description='t', max=499), Output()), _dom_classes=('widget-interact'…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# from IPython.html.widgets import *\n", "from ipywidgets import *\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "%matplotlib inline\n", "\n", "# playback the recorded motion of the particles\n", "def plot_particles(t=0):\n", " x = []\n", " y = []\n", " for i in range(len(history[t])):\n", " x.append(history[t][i].x)\n", " y.append(history[t][i].y)\n", " plt.figtext(0.15,0.82,'t = ' + str(t))\n", " plt.plot(x, y, '.r')\n", " plt.axis([-20,20, -5,25])\n", "\n", "interact(plot_particles, t=(0,len(history)-1,1));" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2- Update\n", "\n", "For the update step it is useful to observe a single particle and the effect of getting a new measurements on the weight of the particle.\n", "\n", "As mentioned earlier, each particle maintains $N$ $2x2$ EKFs to estimate the landmarks, which includes the EKF process described in the EKF notebook. The difference is the change in the weight of the particle according to how likely the measurement is.\n", "\n", "The weight is updated according to the following equation: \n", "\n", "$\\begin{equation*}\n", "w_i = |2\\pi Q|^{\\frac{-1}{2}} exp\\{\\frac{-1}{2}(z_t - \\hat z_i)^T Q^{-1}(z_t-\\hat z_i)\\}\n", "\\end{equation*}$\n", "\n", "Where, $w_i$ is the computed weight, $Q$ is the measurement covariance, $z_t$ is the actual measurment and $\\hat z_i$ is the predicted measurement of particle $i$.\n", "\n", "To experiment this, a single particle is initialized then passed an initial measurement, which results in a relatively average weight. However, setting the particle coordinate to a wrong value to simulate wrong estimation will result in a very low weight. The lower the weight the less likely that this particle will be drawn during resampling and probably will die out." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "initial weight 1.0\n", "weight after landmark intialization 1.0\n", "weight after update 0.023098460073039763\n", "weight after wrong prediction 7.951154575772496e-07\n" ] } ], "source": [ "# CODE SNIPPET #\n", "def observation(xTrue, xd, u, RFID):\n", "\n", " # calc true state\n", " xTrue = motion_model(xTrue, u)\n", "\n", " # add noise to range observation\n", " z = np.zeros((3, 0))\n", " for i in range(len(RFID[:, 0])):\n", "\n", " dx = RFID[i, 0] - xTrue[0, 0]\n", " dy = RFID[i, 1] - xTrue[1, 0]\n", " d = math.sqrt(dx**2 + dy**2)\n", " angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0])\n", " if d <= MAX_RANGE:\n", " dn = d + np.random.randn() * Qsim[0, 0] # add noise\n", " anglen = angle + np.random.randn() * Qsim[1, 1] # add noise\n", " zi = np.array([dn, pi_2_pi(anglen), i]).reshape(3, 1)\n", " z = np.hstack((z, zi))\n", "\n", " # add noise to input\n", " ud1 = u[0, 0] + np.random.randn() * Rsim[0, 0]\n", " ud2 = u[1, 0] + np.random.randn() * Rsim[1, 1] + OFFSET_YAWRATE_NOISE\n", " ud = np.array([ud1, ud2]).reshape(2, 1)\n", "\n", " xd = motion_model(xd, ud)\n", "\n", " return xTrue, z, xd, ud\n", "\n", "def update_with_observation(particles, z):\n", " for iz in range(len(z[0, :])):\n", "\n", " lmid = int(z[2, iz])\n", "\n", " for ip in range(N_PARTICLE):\n", " # new landmark\n", " if abs(particles[ip].lm[lmid, 0]) <= 0.01:\n", " particles[ip] = add_new_lm(particles[ip], z[:, iz], Q)\n", " # known landmark\n", " else:\n", " w = compute_weight(particles[ip], z[:, iz], Q)\n", " particles[ip].w *= w\n", " particles[ip] = update_landmark(particles[ip], z[:, iz], Q)\n", "\n", " return particles\n", "\n", "def compute_weight(particle, z, Q):\n", " lm_id = int(z[2])\n", " xf = np.array(particle.lm[lm_id, :]).reshape(2, 1)\n", " Pf = np.array(particle.lmP[2 * lm_id:2 * lm_id + 2])\n", " zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q)\n", " dx = z[0:2].reshape(2, 1) - zp\n", " dx[1, 0] = pi_2_pi(dx[1, 0])\n", "\n", " try:\n", " invS = np.linalg.inv(Sf)\n", " except np.linalg.linalg.LinAlgError:\n", " print(\"singuler\")\n", " return 1.0\n", "\n", " num = math.exp(-0.5 * dx.T @ invS @ dx)\n", " den = 2.0 * math.pi * math.sqrt(np.linalg.det(Sf))\n", " w = num / den\n", "\n", " return w\n", "\n", "def compute_jacobians(particle, xf, Pf, Q):\n", " dx = xf[0, 0] - particle.x\n", " dy = xf[1, 0] - particle.y\n", " d2 = dx**2 + dy**2\n", " d = math.sqrt(d2)\n", "\n", " zp = np.array(\n", " [d, pi_2_pi(math.atan2(dy, dx) - particle.yaw)]).reshape(2, 1)\n", "\n", " Hv = np.array([[-dx / d, -dy / d, 0.0],\n", " [dy / d2, -dx / d2, -1.0]])\n", "\n", " Hf = np.array([[dx / d, dy / d],\n", " [-dy / d2, dx / d2]])\n", "\n", " Sf = Hf @ Pf @ Hf.T + Q\n", "\n", " return zp, Hv, Hf, Sf\n", "\n", "def add_new_lm(particle, z, Q):\n", "\n", " r = z[0]\n", " b = z[1]\n", " lm_id = int(z[2])\n", "\n", " s = math.sin(pi_2_pi(particle.yaw + b))\n", " c = math.cos(pi_2_pi(particle.yaw + b))\n", "\n", " particle.lm[lm_id, 0] = particle.x + r * c\n", " particle.lm[lm_id, 1] = particle.y + r * s\n", "\n", " # covariance\n", " Gz = np.array([[c, -r * s],\n", " [s, r * c]])\n", "\n", " particle.lmP[2 * lm_id:2 * lm_id + 2] = Gz @ Q @ Gz.T\n", "\n", " return particle\n", "\n", "def update_KF_with_cholesky(xf, Pf, v, Q, Hf):\n", " PHt = Pf @ Hf.T\n", " S = Hf @ PHt + Q\n", "\n", " S = (S + S.T) * 0.5\n", " SChol = np.linalg.cholesky(S).T\n", " SCholInv = np.linalg.inv(SChol)\n", " W1 = PHt @ SCholInv\n", " W = W1 @ SCholInv.T\n", "\n", " x = xf + W @ v\n", " P = Pf - W1 @ W1.T\n", "\n", " return x, P\n", "\n", "def update_landmark(particle, z, Q):\n", "\n", " lm_id = int(z[2])\n", " xf = np.array(particle.lm[lm_id, :]).reshape(2, 1)\n", " Pf = np.array(particle.lmP[2 * lm_id:2 * lm_id + 2, :])\n", "\n", " zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q)\n", "\n", " dz = z[0:2].reshape(2, 1) - zp\n", " dz[1, 0] = pi_2_pi(dz[1, 0])\n", "\n", " xf, Pf = update_KF_with_cholesky(xf, Pf, dz, Q, Hf)\n", "\n", " particle.lm[lm_id, :] = xf.T\n", " particle.lmP[2 * lm_id:2 * lm_id + 2, :] = Pf\n", "\n", " return particle\n", "\n", "# END OF CODE SNIPPET #\n", "\n", "\n", "\n", "# Setting up the landmarks\n", "RFID = np.array([[10.0, -2.0],\n", " [15.0, 10.0]])\n", "N_LM = RFID.shape[0]\n", "\n", "# Initialize 1 particle\n", "N_PARTICLE = 1\n", "particles = [Particle(N_LM) for i in range(N_PARTICLE)]\n", "\n", "xTrue = np.zeros((STATE_SIZE, 1))\n", "xDR = np.zeros((STATE_SIZE, 1))\n", "\n", "print(\"initial weight\", particles[0].w)\n", "\n", "xTrue, z, _, ud = observation(xTrue, xDR, u, RFID)\n", "# Initialize landmarks\n", "particles = update_with_observation(particles, z)\n", "print(\"weight after landmark intialization\", particles[0].w)\n", "particles = update_with_observation(particles, z)\n", "print(\"weight after update \", particles[0].w)\n", "\n", "particles[0].x = -10\n", "particles = update_with_observation(particles, z)\n", "print(\"weight after wrong prediction\", particles[0].w)\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3- Resampling\n", "\n", "In the reseampling steps a new set of particles are chosen from the old set. This is done according to the weight of each particle. \n", "\n", "The figure shows 100 particles distributed uniformly between [-0.5, 0.5] with the weights of each particle distributed according to a Gaussian funciton. \n", "\n", "The resampling picks \n", "\n", "$i \\in 1,...,N$ particles with probability to pick particle with index $i ∝ \\omega_i$, where $\\omega_i$ is the weight of that particle\n", "\n", "\n", "To get the intuition of the resampling step we will look at a set of particles which are initialized with a given x location and weight. After the resampling the particles are more concetrated in the location where they had the highest weights. This is also indicated by the indices \n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# CODE SNIPPET #\n", "def normalize_weight(particles):\n", "\n", " sumw = sum([p.w for p in particles])\n", "\n", " try:\n", " for i in range(N_PARTICLE):\n", " particles[i].w /= sumw\n", " except ZeroDivisionError:\n", " for i in range(N_PARTICLE):\n", " particles[i].w = 1.0 / N_PARTICLE\n", "\n", " return particles\n", "\n", " return particles\n", "\n", "\n", "def resampling(particles):\n", " \"\"\"\n", " low variance re-sampling\n", " \"\"\"\n", "\n", " particles = normalize_weight(particles)\n", "\n", " pw = []\n", " for i in range(N_PARTICLE):\n", " pw.append(particles[i].w)\n", "\n", " pw = np.array(pw)\n", "\n", " Neff = 1.0 / (pw @ pw.T) # Effective particle number\n", " # print(Neff)\n", "\n", " if Neff < NTH: # resampling\n", " wcum = np.cumsum(pw)\n", " base = np.cumsum(pw * 0.0 + 1 / N_PARTICLE) - 1 / N_PARTICLE\n", " resampleid = base + np.random.rand(base.shape[0]) / N_PARTICLE\n", "\n", " inds = []\n", " ind = 0\n", " for ip in range(N_PARTICLE):\n", " while ((ind < wcum.shape[0] - 1) and (resampleid[ip] > wcum[ind])):\n", " ind += 1\n", " inds.append(ind)\n", "\n", " tparticles = particles[:]\n", " for i in range(len(inds)):\n", " particles[i].x = tparticles[inds[i]].x\n", " particles[i].y = tparticles[inds[i]].y\n", " particles[i].yaw = tparticles[inds[i]].yaw\n", " particles[i].w = 1.0 / N_PARTICLE\n", "\n", " return particles, inds\n", "# END OF SNIPPET #\n", "\n", "\n", "\n", "def gaussian(x, mu, sig):\n", " return np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.)))\n", "N_PARTICLE = 100\n", "particles = [Particle(N_LM) for i in range(N_PARTICLE)]\n", "x_pos = []\n", "w = []\n", "for i in range(N_PARTICLE):\n", " particles[i].x = np.linspace(-0.5,0.5,N_PARTICLE)[i]\n", " x_pos.append(particles[i].x)\n", " particles[i].w = gaussian(i, N_PARTICLE/2, N_PARTICLE/20)\n", " w.append(particles[i].w)\n", " \n", "\n", "# Normalize weights\n", "sw = sum(w)\n", "for i in range(N_PARTICLE):\n", " w[i] /= sw\n", "\n", "particles, new_indices = resampling(particles)\n", "x_pos2 = []\n", "for i in range(N_PARTICLE):\n", " x_pos2.append(particles[i].x)\n", " \n", "# Plot results\n", "fig, ((ax1,ax2,ax3)) = plt.subplots(nrows=3, ncols=1)\n", "fig.tight_layout()\n", "ax1.plot(x_pos,np.ones((N_PARTICLE,1)), '.r', markersize=2)\n", "ax1.set_title(\"Particles before resampling\")\n", "ax1.axis((-1, 1, 0, 2))\n", "ax2.plot(w)\n", "ax2.set_title(\"Weights distribution\")\n", "ax3.plot(x_pos2,np.ones((N_PARTICLE,1)), '.r')\n", "ax3.set_title(\"Particles after resampling\")\n", "ax3.axis((-1, 1, 0, 2))\n", "fig.subplots_adjust(hspace=0.8)\n", "plt.show()\n", "\n", "plt.figure()\n", "plt.hist(new_indices)\n", "plt.xlabel(\"Particles indices to be resampled\")\n", "plt.ylabel(\"# of time index is used\")\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### References\n", "\n", "http://www.probabilistic-robotics.org/\n", "\n", "http://ais.informatik.uni-freiburg.de/teaching/ws12/mapping/pdf/slam10-fastslam.pdf" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }