{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Welcome to the Prognostics Algorithms Package Tutorial" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The goal of this notebook is to instruct the user on how to use and extend the NASA Python Prognostics Algorithms Package. \n", "\n", "First some background. The Prognostics Algorithms Package (`prog_algs`) contains tools for performing prognostics (event prediction) using the Prognostics Models Package. `prog_algs` also includes tools for analyzing the performance of prognostics algorithms. \n", "\n", "A few definitions:\n", "* state estimation: The process of estimating the (possibly hidden) state of a system given sensor information on observable states\n", "* prediction: The process of predicting the evolution of a system state with time and the occurrence of events. \n", "\n", "The `prog_algs` package has the following primary subpackages\n", "* `prog_algs.state_estimators` - Tools for performing state estimation\n", "* `prog_algs.predictors` - Tools for performing prediction\n", "* `prog_algs.uncertain_data` - Tools for representing data with uncertainty\n", "\n", "In addition to the `prog_algs` package, this repo includes examples showing how to use the package (see `examples/`), a template for implementing a new state estimator (`state_estimator_template`), a template for implementing a new predictor (`predictor_template`), documentation (), and this tutorial (`tutorial.ipynb`).\n", "\n", "Before you start, install `prog_algs` using pip:\n", "\n", " `pip install prog_algs`\n", "\n", "or, to use the pre-release, close from GitHub and checkout the dev branch. Then run the following command:\n", " `pip install -e .`\n", "\n", "Now let's get started with some examples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## UncertainData - Representing a Distribution" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Uncertainty is sometimes present in data used for performing state estimations or making predictions.\n", "\n", "In `prog_algs`, data with uncertainty is represented using classes inheriting from `UncertainData`:\n", "* `prog_algs.uncertain_data.MultivariateNormalDist` - Data represented by a multivariate normal distribution with mean and covariance matrix\n", "* `prog_algs.uncertain_data.ScalarData` - Data without uncertainty, a single value\n", "* `prog_algs.uncertain_data.UnweightedSamples` - Data represented by a set of unweighted samples. Objects of this class can be treated like a list where samples[n] returns the nth sample (Dict)\n", "\n", "To begin using `UncertainData`, import the type that best portrays the data. In this simple demonstration, we import the `UnweightedSamples` data type. See for full details on the available `UncertainData` types." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from prog_algs.uncertain_data import UnweightedSamples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With `UnweightedSamples` imported, construct an object with samples. This object can be initialized using either a dictionary, list, or model.*Container type from prog_models (e.g., StateContainer). Let's try creating an object using a dictionary. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "samples = UnweightedSamples([{'x': 1, 'v':2}, {'x': 3, 'v':-2}])\n", "print(samples)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Given an integer value, addition and subtraction can be performed on the `UncertainData` classes to adjust the distribution by a scalar amount." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "samples = samples + 5\n", "print(samples)\n", "samples -= 3\n", "print(samples)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also sample from any `UncertainData` distribution using the `sample` method. In this case it resamples from the existing samples" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(samples.sample()) # A single sample\n", "print(samples.sample(10)) # 10 samples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see the keys present using the `.keys()` method:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(samples.keys())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "and the data corresponding to a specific key can be retrieved using `.key()`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(samples.key('x'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Various properties are available to quantify the `UncertainData` distribution" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('mean', samples.mean)\n", "print('median', samples.median)\n", "print('covariance', samples.cov)\n", "print('size', samples.size)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "These `UncertainData` classes are used throughout the prog_algs package to represent data with uncertainty, as described in the following sections." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## State Estimation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "State estimation is the process of estimating the system state given sensor data and a model. Typically, this is done repeatedly as new sensor data is available.\n", "\n", "In `prog_algs` a State Estimator is used to estimate the system state. \n", "\n", "To start, import the needed packages. Here we will import the `BatteryCircuit` model and the `UnscentedKalmanFilter` state estimator. See for more details on the available state estimators.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from prog_models.models import BatteryCircuit\n", "from prog_algs.state_estimators import UnscentedKalmanFilter" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we construct and initialize the model. \n", "\n", "We use the resulting model and initial state to construct the state estimator. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "m = BatteryCircuit()\n", "x0 = m.initialize()\n", "\n", "# Turn into a distribution - this represents uncertainty in the initial state\n", "from prog_algs.uncertain_data import MultivariateNormalDist\n", "from numpy import diag\n", "INITIAL_UNCERT = 0.05 # Uncertainty in initial state (%)\n", "# Construct covariance matrix (making sure each value is positive)\n", "cov = diag([max(abs(INITIAL_UNCERT * value), 1e-9) for value in x0.values()])\n", "x0 = MultivariateNormalDist(x0.keys(), x0.values(), cov)\n", "\n", "# Construct State estimator\n", "est = UnscentedKalmanFilter(m, x0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can use the estimator to estimate the system state." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Prior State:\", est.x.mean)\n", "print('\\tSOC: ', m.event_state(est.x.mean)['EOD'])\n", "fig = est.x.plot_scatter(label='prior')\n", "\n", "t = 0.1\n", "u = m.InputContainer({'i': 2})\n", "example_measurements = m.OutputContainer({'t': 32.2, 'v': 3.915})\n", "est.estimate(t, u, example_measurements)\n", "\n", "print(\"Posterior State:\", est.x.mean)\n", "print('\\tSOC: ', m.event_state(est.x.mean)['EOD'])\n", "est.x.plot_scatter(fig= fig, label='posterior')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As mentioned previously, this step is typically repeated when there's new data. filt.x may not be accessed every time the estimate is updated, only when it's needed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prediction Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Prediction is the practice of using a state estimation, a model, and estimates of future loading to predict future states and when an event will occur.\n", "\n", "First we will import a predictor. In this case, we will use the MonteCarlo Predictor, but see documentation for a full list of predictors and their configuration parameters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from prog_algs.predictors import MonteCarlo" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we initialize it using the model from the above example" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mc = MonteCarlo(m)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's define future loading and the first state. The first state is the output of the state estimator, and the future loading scheme is a simple piecewise function" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x = est.x # The state estimate\n", "\n", "def future_loading(t, x={}):\n", " # Variable (piece-wise) future loading scheme \n", " if (t < 600):\n", " i = 2\n", " elif (t < 900):\n", " i = 1\n", " elif (t < 1800):\n", " i = 4\n", " elif (t < 3000):\n", " i = 2\n", " else:\n", " i = 3\n", " return m.InputContainer({'i': i})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's use the constructed mc predictor to perform a single prediction. Here we're setting dt to 0.25. Note this may take up to a minute" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mc_results = mc.predict(x, future_loading, dt=0.25, n_samples=20)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The predict function returns predictions of future inputs, states, outputs, and event_states at each save point. For sample-based predictors like the monte carlo, these can be accessed like an array with the format `[sample #][time]` so that `mc_results.states[m][n]` corresponds to the state for sample `m` at time `mc_results.times[m][n]`. Alternately, use the method `snapshot` to get a single point in time. e.g., \n", "\n", " `state = mc_results.states.snapshot(3)`\n", "\n", "In this case the state snapshot corresponds to time `mc_results.times[3]`. The snapshot method returns type UncertainData. \n", "\n", "The `predict` method also returns Time of Event (ToE) as a type UncertainData, representing the predicted time of event (for each event predicted), with uncertainty.\n", "\n", "Next, let's use the metrics package to analyze the ToE" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"\\nEOD Predictions (s):\")\n", "print('\\tPortion between 3005.2 and 3005.6: ', mc_results.time_of_event.percentage_in_bounds([3005.2, 3005.6]))\n", "print('\\tAssuming ground truth 3005.25: ', mc_results.time_of_event.metrics(ground_truth = 3005.25))\n", "from prog_algs.metrics import prob_success \n", "print('\\tP(Success) if mission ends at 3005.25: ', prob_success(mc_results.time_of_event, 3005.25))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "These analysis methods applied to ToE can also be applied to anything of type UncertainData (e.g., state snapshot). \n", "\n", "You can also visualize the results in a variety of different ways. For example, state transition" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig = mc_results.states.snapshot(0).plot_scatter(label = \"t={:.0f}\".format(int(mc_results.times[0])))\n", "for i in range(1, 4):\n", " index = int(len(mc_results.times)/4*i)\n", " mc_results.states.snapshot(index).plot_scatter(fig=fig, label = \"t={:.0f}\".format(mc_results.times[index]))\n", "mc_results.states.snapshot(-1).plot_scatter(fig = fig, label = \"t={:.0f}\".format(int(mc_results.times[-1])))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Or time of event (ToE)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig = mc_results.time_of_event.plot_hist()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note, for this event, there is only one event (End of Discharge). Many models have multiple events that can be predicted. For these models, ToE for each event is returned and can be analyzed.\n", "\n", "Alternately, a specific event (or events) can be specified for prediction. See `examples.predict_specific_event` for more details." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Frequently the prediction step is run periodically, less often than the state estimator step" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Extending - Adding a new state estimator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "New state estimators can be created by extending the state_estimator interface. As an example lets use a really dumb state estimator that adds random noise each step - and accepts the state that is closest. \n", "\n", "First thing we need to do is import the StateEstimator parent class" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from prog_algs.state_estimators.state_estimator import StateEstimator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we select how state will be represented. In this case there's no uncertainty- it's just one state, so we represent it as a scaler. Import the appropriate class" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from prog_algs.uncertain_data import ScalarData" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we construct the class, implementing the functions of the state estimator template (`state_estimator_template.py`)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import random \n", "\n", "class BlindlyStumbleEstimator(StateEstimator):\n", " def __init__(self, model, x0):\n", " self.m = model\n", " self.state = x0\n", "\n", " def estimate(self, t, u, z):\n", " # Generate new candidate state\n", " x2 = {key : float(value) + 10*(random.random()-0.5) for (key,value) in self.state.items()}\n", "\n", " # Calculate outputs\n", " z_est = self.m.output(self.state)\n", " z_est2 = self.m.output(x2)\n", "\n", " # Now score them each by how close they are to the measured z\n", " z_est_score = sum([abs(z_est[key] - z[key]) for key in self.m.outputs])\n", " z_est2_score = sum([abs(z_est2[key] - z[key]) for key in self.m.outputs])\n", "\n", " # Now choose the closer one\n", " if z_est2_score < z_est_score: \n", " self.state = x2\n", "\n", " @property\n", " def x(self):\n", " return ScalarData(self.state)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Great, now let's try it out using the model from earlier. with an initial state of all 0s. It should slowly converge towards the correct state" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "x0 = {key: 0 for key in m.states}\n", "se = BlindlyStumbleEstimator(m, x0)\n", "\n", "for i in range(25):\n", " u = m.InputContainer({'i': 0})\n", " z = m.OutputContainer({'t': 18.95, 'v': 4.183})\n", " se.estimate(i, u, z)\n", " print(se.x.mean)\n", " print(\"\\tcorrect: {'tb': 18.95, 'qb': 7856.3254, 'qcp': 0, 'qcs': 0}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Extending - Adding a new Predictor" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Like the example above with StateEstimators, Predictors can be extended by subclassing the Predictor class. Copy `predictor_template.py` as a starting point." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusion\n", "This is just the basics, there's much more to learn. Please see the documentation at and the examples in the `examples/` folder for more details on how to use the package, including:\n", "* `examples.basic_example` : A basic Example using prog_algs for Prognostics \n", "* `examples.benchmarking_example` : An example benchmarking the performance of prognostics algorithms\n", "* `examples.eol_event` : An example where a model has multiple events, but the user is only interested in predicting the time when the first event occurs (whatever it is).\n", "* `examples.measurement_eqn_example` : An example where not every output is measured or measurements are not in the same format as outputs, so a measurement equation is defined to translate between outputs and what's measured. \n", "* `examples.new_state_estimator_example` : An example of extending StateEstimator to create a new state estimator class\n", "* `examples.playback` : A full example performing prognostics using playback data.\n", "* `examples.predict_specific_event` : An example where the model has multiple events, but the user is only interested in predicting a specific event (or events).\n", "* `examples.thrown_object_example` : An example performing prognostics with the simplified ThrownObject model\n", "* `examples.utpredictor` : An example using the Unscented Transform Predictor for prediction.\n", "\n", "Thank you for trying out this tutorial. Open an issue on github () or email Chris Teubert (christopher.a.teubert@nasa.gov) with any questions or issues." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright © 2021 United States Government as represented by the Administrator of the National Aeronautics and Space Administration. All Rights Reserved." ] } ], "metadata": { "interpreter": { "hash": "ff94885aa2d97705a9dae03869c2058fa855d1acd9df351499300343e2e591a2" }, "kernelspec": { "display_name": "Python 3.8.13 64-bit", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" }, "metadata": { "interpreter": { "hash": "ff94885aa2d97705a9dae03869c2058fa855d1acd9df351499300343e2e591a2" } }, "orig_nbformat": 2 }, "nbformat": 4, "nbformat_minor": 2 }