{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Mass-Spring-Damper Model Estimation\n", "-----------------------------------" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### System Description" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is a really simple model: A 1-DOF mass-spring-damper system." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from IPython.display import Image\n", "\n", "Image(url='https://github.com/stuckeyr/msd/raw/master/mass_spring_damper.png')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "See [here](http://ctms.engin.umich.edu/CTMS/index.php?example=Introduction§ion=SystemModeling#5) for a more detailed explanation." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The equations of motion can be written:\n", "\n", "$$\n", "\\sum F_x = F(t) - b \\dot{x} - kx = m \\ddot{x}\n", "$$\n", "\n", "where $x$ is the displacement, $m$ is the mass, $k$ is the spring constant, $b$ is the damping constant and $F$ is the input force, expressed here as a function of time." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Reframing the system in state-space form, using the state vector:\n", "\n", "$$\n", "\\mathbf{x} = \\begin{bmatrix} x \\\\\\ \\dot{x} \\end{bmatrix}\n", "$$" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "the equations are written:\n", "\n", "$$\n", "m \\mathbf{\\dot{x}} = \\begin{bmatrix} \\dot{x} \\\\\\ \\ddot{x} \\end{bmatrix} = \\begin{bmatrix} 0 & m \\\\\\ -k & -b \\end{bmatrix} \\begin{bmatrix} x \\\\\\ \\dot{x} \\end{bmatrix} + \\begin{bmatrix} 0 \\\\\\ 1 \\end{bmatrix} F(t)\n", "$$" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simulation in Python " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, reset the workspace. Import all necessary libraries (the following assumes you have built both Boost and Cython models)." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "%matplotlib notebook\n", "# %matplotlib inline" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "%reset -f" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import numpy.matlib as ml\n", "from scipy import interpolate, integrate" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the model. Below is the pure Python version, which I have included for information." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# ------------------------------------------------------------------------------\n", "# MSD class\n", "# ------------------------------------------------------------------------------\n", "class MSD(object):\n", " \"\"\"\n", " The MSD class represents a Mass-Spring-Damper system.\n", " \"\"\"\n", " # System parameters\n", " m = 30.48\n", "\n", " def __init__(self, name, **kwargs):\n", " \"\"\"\n", " Initialise the msd object.\n", "\n", " :param: name = system name\n", " \"\"\"\n", " self.name = name\n", "\n", " # Pass through any other keyword arguments\n", " for key in kwargs:\n", " self.__dict__[key] = kwargs[key]\n", "\n", " self.c_idx = [ 'k', 'b', 'd' ]\n", "\n", " # Model coefficients\n", " self.C = { 'k': -50.0, 'b': -10.0, 'd': 1.0, 'z': 0.0 }\n", "\n", " # External force function\n", " self.d_func = None\n", "\n", " # State noise function\n", " self.w_func = None\n", "\n", " self.init()\n", "\n", " def __str__(self):\n", " return self.name\n", "\n", " def init(self):\n", " \"\"\"\n", " Construct the force and moment matrices.\n", " \"\"\"\n", " # Rigid body mass matrix\n", " self.C_M_I = 1.0/self.m\n", "\n", " self.C_SD = ml.zeros((2, 2))\n", " self.M_EF = ml.zeros((2, 1))\n", "\n", " def get_coeffs(self):\n", " \"\"\"\n", " Get the model coefficients.\n", " \"\"\"\n", " return self.C\n", "\n", " def set_coeffs(self, C):\n", " \"\"\"\n", " Set the model coefficients.\n", " \"\"\"\n", " for ck in C.keys():\n", " self.C[ck] = C[ck]\n", "\n", " def set_external_forces(self, T_S, D_S, interp_kind):\n", " \"\"\"\n", " Set the external force interpolant points.\n", " \"\"\"\n", " self.d_func = interpolate.interp1d(T_S, D_S, kind=interp_kind, axis=0, bounds_error=False)\n", "\n", " def add_state_noise(self, T_S, W_S):\n", " \"\"\"\n", " Set the state noise interpolant points.\n", " \"\"\"\n", " self.w_func = interpolate.interp1d(T_S, W_S, kind='linear', axis=0, bounds_error=False)\n", "\n", " def rates(self, x, t):\n", " \"\"\"\n", " Calculate the system state-rate for the current state x.\n", "\n", " :param: x = current system state [ xp, xpd ]\n", " :param: t = current time\n", "\n", " :returns: xdot = system state-rate\n", " \"\"\"\n", " # Spring-damper forces\n", " # C_SD = np.mat([[ 0.0, self.m ],\n", " # [ self.C['k'], self.C['b']]])\n", " self.C_SD[0, 0] = 0.0\n", " self.C_SD[0, 1] = self.m\n", " self.C_SD[1, 0] = self.C['k']\n", " self.C_SD[1, 1] = self.C['b']\n", "\n", " M_SD = self.C_SD*x.reshape((-1, 1))\n", "\n", " d = np.nan_to_num(self.d_func(t))\n", "\n", " # External force\n", " # M_EF = np.mat([[ 0.0 ],\n", " # [ self.C['d']*f ]])\n", " self.M_EF[0, 0] = 0.0\n", " self.M_EF[1, 0] = self.C['d']*d\n", "\n", " xdot = np.ravel(self.C_M_I*(M_SD + self.M_EF))\n", "\n", " if (self.w_func is not None):\n", " xdot[1] += np.nan_to_num(self.w_func(t))\n", "\n", " return xdot\n", "\n", " def rrates(self, t, x):\n", " \"\"\"\n", " Rates method with arguments reversed.\n", " \"\"\"\n", " return self.rates(x, t, self.d_func)\n", "\n", " def forces(self, xdot, x):\n", " \"\"\"\n", " Calculate the forces from recorded state data.\n", "\n", " :param: xdot = system state rate\n", " :param: x = system state [ xp, xpd ]\n", "\n", " :returns: f = state forces\n", " \"\"\"\n", " xpddot = xdot[1]\n", "\n", " f = self.m*xpddot\n", "\n", " return f\n", "\n", " def integrate(self, x0, T):\n", " \"\"\"\n", " Integrate the differential equations and calculate the resulting rates and forces.\n", "\n", " :param: x0 = initial system state\n", " :param: T = sequence of time points for which to solve for x\n", "\n", " :returns: X = system state array\n", " :returns: Xdot = state rates array\n", " :returns: F = state force array\n", " \"\"\"\n", " N = T.shape[0]\n", "\n", " # Initialise the model\n", " self.init()\n", "\n", " # Perform the integration\n", " X = integrate.odeint(self.rates, x0, T, rtol=1.0e-6, atol=1.0e-6)\n", "\n", " Xdot = np.zeros((N, len(x0)))\n", " for n in range(N):\n", " Xdot[n] = self.rates(X[n], T[n])\n", "\n", " # Force and moment matrix\n", " F = np.zeros((N, 1))\n", " for n in range(N):\n", " F[n] = self.forces(Xdot[n], X[n])\n", "\n", " return X, Xdot, F" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But we are going to use the Cython implementation here, as the Python model is very slow!. Simulate and add (Gaussian) noise to the output. 1) * 1.0 for _ in range(T_S0.shape[0] - 2) ]), d0)
interpfun = interpolate.interp1d(T_S0, D_S0, kind='zero', axis=0, bounds_error=False)
D0 = np.array([ [ interpfun(t) ] for t in T ])
T_S = T_S0.copy()
D_S = D_S0.copy()
D = D0.copy()

# Create the simulation model
if (MODEL == 'python'):
    # Pure Python
    msd = MSD("Mass-Spring-Damper (Python)")
    msd.set_external_forces(T_S, D_S, 'zero')
elif (MODEL == 'cython'):
    # Cython
    msd = MSD_CYTHON("Mass-Spring-Damper (Cython)")
    msd.set_external_forces(T_S, D_S, 'zero')
elif (MODEL == 'pyublas'):
    # PyUblas extension
    msd = MSD_PYUBLAS("Mass-Spring-Damper (PyUblas)", N)
    msd.set_external_forces(T_S, D_S, 'zero')
elif (MODEL == 'numba'):
    # Numba JIT
    msd = MSD_NUMBA("Mass-Spring-Damper (Numba)", N)
    msd.set_external_forces(T_S, D_S, 'zero')
elif (MODEL == 'numba_jc'):
    # Numba JIT
    msd = MSD_NUMBA_JC(N)
    msd.set_external_forces(T_S, D_S, 0)
elif (MODEL == 'boost'):
    # Boost extension
    msd = MSD_BOOST("Mass-Spring-Damper (Boost)", N)
    msd.set_external_forces(T_S, D_S, 'zero')

# Identification keys
c_idx = ['k', 'b', 'd']

# True parameter set
CT = [ msd.get_coeffs()[ck] for ck in c_idx ]

# Initial parameter set
C0 = [ 0.5*msd.get_coeffs()[ck] for ck in c_idx ]

# Add any state noise
if (STATE_NOISE_SD > 0.0):
    sdw = STATE_NOISE_SD
    W = np.random.randn(N, 1)*sdw
    msd.add_state_noise(T, W)

# Compute the response
if (MODEL in ['python', 'cython', 'pyublas', 'numba', 'boost']):
    X, Xdot, F = msd.integrate(x0, T)
elif (MODEL == 'numba_jc'):
    X, Xdot, F = msd_integrate(msd, x0, T)

# State noise standard deviation vector
sdz = np.zeros((len(x0),))
if any(w > 0.0 for w in NOISE_SD[:2]):
    sdz = np.array(NOISE_SD[:2])

# Measured state matrix
Z = X + np.random.randn(N, len(x0))*sdz

# Set the initial measured state equal to the initial true state
z0 = x0

Nu = Z[:,1]
sdnu = sdz[1]

# State rate noise standard deviation vector
sdzdot = np.zeros((len(x0),))
if any(w > 0.0 for w in NOISE_SD[:2]):
    sdzdot = np.array(NOISE_SD[:2])

# Measured state rate matrix
Zdot = Xdot + np.random.randn(N, len(x0))*sdzdot

# External force noise standard deviation vector
sde = 0.0
if (NOISE_SD[2] > 0.0):
    sde = NOISE_SD[2]

# Measured external force matrix
E = D + np.random.randn(N, 1)*sde

# Set the initial measured external force equal to the initial true external force
e0 = d0

# Compute the inertial force and noise standard deviation
G = F.copy()
sdg = 0.0
if any(w > 0.0 for w in NOISE_SD):
    # Forces are calculated from (measured) accelerations, not measured directly
    for n in range(N):
        G[n] = msd.forces(Zdot[n], Z[n])
    sdg = np.std(F - G) This is primarily due to the fact that the error in the independant variables is introducing bias into the LS estimate.

Let's see how the same model compares with a different response dataset. Although it did take some time. And the system response is quite good too. Let's check again using a different input and response dataset." ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "C_PM0 = C_PM" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "T_S = T_S1.copy()\n", "D_S = D_S1.copy()\n", "D = D1.copy()" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [], "source": [ "%run -i sim.py" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [], "source": [ "msd_est.set_external_forces(T, E, 'linear_uniform')\n", "\n", "for i in range(len(c_idx)):\n", " msd_est.set_coeffs({ 'k': C_PM0[0], 'b': C_PM0[1], 'd': C_PM0[2] })\n", "\n", "# Compute the response\n", "if (MODEL in ['python', 'cython', 'pyublas', 'numba', 'boost']):\n", " Xe, Xedot, Fe = msd_est.integrate(z0, T)\n", "elif (MODEL == 'numba_jc'):\n", " Xe, Xedot, Fe = msd_integrate(msd_est, z0, T)" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "data": { "application/javascript": [ "/* Put everything inside the global mpl namespace */\n", "window.mpl = {};\n", "\n", "\n", "mpl.get_websocket_type = function() {\n", " if (typeof(WebSocket) !== 'undefined') {\n", " return WebSocket;\n", " } else if (typeof(MozWebSocket) !== 'undefined') {\n", " return MozWebSocket;\n", " } else {\n", " alert('Your browser does not have WebSocket support.' +\n", " 'Please try Chrome, Safari or Firefox ≥ 6. this is a bit hacky, but it is done because the \"notebook_saving.Notebook\"\n", " // IPython event is triggered only after the cells have been serialised, which for\n", " // our purposes (turning an active figure into a static one), is too late.\n", " var cells = IPython.notebook.get_cells();\n", " var ncells = cells.length;\n", " for (var i=0; i= 3 moved mimebundle to data attribute of output\n", " data = data.data;\n", " }\n", " if (data['text/html'] == html_output) {\n", " return [cell, data, j];\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\n", "// Register the function which deals with the matplotlib target/channel.\n", "// The kernel may be null if the page has been refreshed.\n", "if (IPython.notebook.kernel != null) {\n", " IPython.notebook.kernel.comm_manager.register_target('matplotlib', mpl.mpl_figure_comm);\n", "}\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "LEVENBERG-MARQUARDT OPTIMIZATION:\n", "Time elapsed: 5.299416 seconds\n", "\n", " TRUE F_EST\n", "k : -50.0000 -49.5020\n", "b : -10.0000 -9.0243\n", "d : 1.0000 0.9791\n" ] } ], "source": [ "FF = ml.repmat(None, 50, 1)\n", "\n", "# Create the simulation model\n", "if (MODEL == 'python'):\n", " # Pure Python\n", " msd_fest = MSD(\"Mass-Spring-Damper_FMIN_EST\")\n", " msd_fest.set_external_forces(T, E, 'linear_unifom')\n", "elif (MODEL == 'cython'):\n", " # Cython\n", " msd_fest = MSD_CYTHON(\"Mass-Spring-Damper_FMIN_EST (Cython)\")\n", " msd_fest.set_external_forces(T, E, 'linear_uniform')\n", "elif (MODEL == 'pyublas'):\n", " # PyUblas extension\n", " msd_fest = MSD_PYUBLAS(\"Mass-Spring-Damper_FMIN_EST (PyUblas)\", N)\n", " msd_fest.set_external_forces(T, E, 'linear_unifom')\n", "elif (MODEL == 'numba'):\n", " # Numba JIT\n", " msd_fest = MSD_NUMBA(\"Mass-Spring-Damper_FMIN_EST (Numba)\", N)\n", " msd_fest.set_external_forces(T, E, 'linear_unifom')\n", "elif (MODEL == 'numba_jc'):\n", " # Numba JIT\n", " msd_fest = MSD_NUMBA_JC(N)\n", " msd_fest.set_external_forces(T, E, 1)\n", "elif (MODEL == 'boost'):\n", " # Boost extension\n", " msd_fest = MSD_BOOST(\"Mass-Spring-Damper_FMIN_EST (Boost)\", N)\n", " msd_fest.set_external_forces(T, E, 'linear_uniform')\n", "\n", "fig, Axes, Lines, Text = plot(msd_fest.name, T, E, Z, G, Xe=np.zeros(X.shape), Fe=np.zeros(F.shape), FF=FF)\n", "fig.canvas.draw()\n", "\n", "kws = { 'fig': fig, 'Axes': Axes, 'Lines': Lines, 'Text': Text }\n", "\n", "c_idx = ['k', 'b', 'd']\n", "\n", "print(\"LEVENBERG-MARQUARDT OPTIMIZATION:\")\n", "\n", "class Fcn2min(object):\n", "\n", " def __init__(self, z0, T, G, FF):\n", " self.z0 = z0\n", " self.T = T\n", " self.G = G\n", " self.FF = FF\n", " self.fopt_max = None\n", " self.it = 0;\n", "\n", " def __call__(self, P, **kws):\n", " C = [ P[c_idx[i]].value for i in range(len(c_idx)) ]\n", " msd_fest.set_coeffs({ 'k': C[0], 'b': C[1], 'd': C[2] })\n", "\n", " # Compute the response\n", " if (MODEL in ['python', 'cython', 'pyublas', 'numba', 'boost']):\n", " Xe, Xedot, Fe = msd_fest.integrate(self.z0, self.T)\n", " elif (MODEL == 'numba_jc'):\n", " Xe, Xedot, Fe = msd_integrate(msd_fest, self.z0, self.T)\n", "\n", " fopt = np.ravel(self.G - Fe)\n", " fopt_sum = np.sum(fopt*fopt)\n", "\n", " if (self.it < np.size(FF, 0)):\n", " self.FF[self.it, 0] = fopt_sum\n", " else:\n", " self.FF = np.roll(self.FF, -1)\n", " self.FF[-1, 0] = fopt_sum\n", "\n", " f_max = None\n", " if ((self.fopt_max is None) or (self.fopt_max < fopt_sum)):\n", " f_max = fopt_sum * 1.1\n", " self.fopt_max = fopt_sum\n", " f_txt = '{:.4f}'.format(fopt_sum)\n", "\n", " updateplot(kws['fig'], kws['Axes'], kws['Lines'], kws['Text'], Xe, Fe, self.FF, f_max=f_max, f_txt=f_txt, c_txt=C)\n", "\n", " self.it += 1\n", "\n", " return fopt\n", "\n", "# Create a set of Parameters\n", "P = lm.Parameters()\n", "for i in range(len(c_idx)):\n", " ck = c_idx[i]\n", " P.add(ck, value=C0[i])\n", "\n", "tic = time.clock()\n", "\n", "fcn2min = Fcn2min(z0, T, G, FF)\n", "\n", "# Do fit, here with leastsq model\n", "res = lm.minimize(fcn2min, P, kws=kws, method='leastsq', epsfcn=0.1)\n", "\n", "toc = time.clock() - tic\n", "print(\"Time elapsed: {:f} seconds\".format(toc))\n", "\n", "C_LM = [ res.params[c_idx[i]].value for i in range(len(c_idx)) ]\n", "\n", "print()\n", "print(\" TRUE F_EST\")\n", "for i in range(len(c_idx)):\n", " ck = c_idx[i]\n", " print(\"{:5s}: {:10.4f} {:10.4f}\".format(ck, msd.get_coeffs()[ck], C_LM[i]))\n", "\n", "msd_fest.set_coeffs({ 'k': C_LM[0], 'b': C_LM[1], 'd': C_LM[2] })\n", "\n", "# Compute the response\n", "if (MODEL in ['python', 'cython', 'pyublas', 'numba', 'boost']):\n", " Xe, Xedot, Fe = msd_fest.integrate(z0, T)\n", "elif (MODEL == 'numba_jc'):\n", " Xe, Xedot, Fe = msd_integrate(msd_fest, z0, T)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Getting close... and a lot faster!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Last, we're going to try to fit our model using Markov-Chain Monte Carlo (MCMC)." ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [], "source": [ "import pymc as mc" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Build the model and run the sampler." ] }, { "cell_type": "code", "execution_count": 40, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "BUILDING PROBABILITY DISTRIBUTION MODEL\n" ] }, { "data": { "application/javascript": [ "/* Put everything inside the global mpl namespace */\n", "window.mpl = {};\n", "\n", "\n", "mpl.get_websocket_type = function() {\n", " if (typeof(WebSocket) !== 'undefined') {\n", " return WebSocket;\n", " } else if (typeof(MozWebSocket) !== 'undefined') {\n", " return MozWebSocket;\n", " } else {\n", " alert('Your browser does not have WebSocket support.' +\n", " 'Please try Chrome, Safari or Firefox ≥ 6. With a fast, accurate optimisation initially and more samples for the adaptive phase, we can get a good estimate of the posteriors." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.2" } }, "nbformat": 4, "nbformat_minor": 1 }