{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 9. Integrated Prognostics Example\n", "\n", "This section demonstrates how to build a fully integrated prognostics example with ProgPy. This is based on the prognostics example section from the __[2024 PHM Tutorial](2024PHMTutorial.ipynb)__ notebook, which was shared at the 2024 PHM Society Conference. We will go through each step from data preparation to setup to putting the example together to make predictions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Table of Contents\n", "\n", "* [Data Preparation](#Data-Preparation)\n", "* [Setting up for Prognostics](#Setting-up-for-Prognostics)\n", " * [Set up Model](#Set-up-Model)\n", " * [Set up State Estimator](#Set-up-State-Estimator)\n", " * [Set up Predictor](#Set-up-Predictor)\n", "* [Prognostics Example](#Prognostics-Example)\n", "* [Conclusion](#Conclusion)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Preparation\n", "First, we need to download the data. To do this, we will use the datasets subpackage in ProgPy. Note that this downloads the battery data from the [PCoE datasets](https://www.nasa.gov/intelligent-systems-division/discovery-and-systems-health/pcoe/pcoe-data-set-repository)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from progpy.datasets import nasa_battery\n", "\n", "(desc, data) = nasa_battery.load_data(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's examine the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(desc[\"description\"])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dataset includes 4 different kinds of runs: trickle, step, reference, and random walk. For this example, we will use the trickle dataset. \n", "\n", "The dataset also includes 4 columns: `relativeTime`, `current`, `voltage`, and `temperature`. `relativeTime` is the time in a specific \"run\" (i.e., with one current draw). To use the random walk dataset, we need to concatenate multiple runs. To do this, we add a new column, `absoluteTime`, which shows time in the dataset (instead of run)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data[35][\"absoluteTime\"] = data[35][\"relativeTime\"]\n", "\n", "for i in range(36, 50):\n", " data[i][\"absoluteTime\"] = (\n", " data[i][\"relativeTime\"] + data[i - 1][\"absoluteTime\"].iloc[-1]\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we combine the data into a single dataset and investigate the results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "\n", "random_walk_dataset = pd.concat(data[35:50], ignore_index=True)\n", "print(random_walk_dataset)\n", "\n", "fig = random_walk_dataset.plot(\n", " y=[\"current\", \"voltage\", \"temperature\"],\n", " subplots=True,\n", " xlabel=\"time (sec)\",\n", " title=\"Random Walk Trickle Data\",\n", ")\n", "fig[0].set_ylabel(\"current (A)\")\n", "fig[1].set_ylabel(\"voltage (V)\")\n", "fig[2].set_ylabel(\"temperature (K)\")\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now the data is ready for this tutorial, let's dive into it." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setting up for Prognostics" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To illustrate how to do prognostics, we will use the [BatteryElectrochemEOD](https://nasa.github.io/progpy/api_ref/progpy/IncludedModels.html#:~:text=class%20progpy.models.BatteryElectroChemEOD(**kwargs)) model. This model predicts the end-of-discharge of a Lithium-ion battery based on a set of differential equations that describe the electrochemistry of the system in [Daigle et al. 2013](https://papers.phmsociety.org/index.php/phmconf/article/view/2252). For more information on using model, refer to the relevant section in __[03 Included Models](03_Existing%20Models.ipynb)__.\n", "\n", "### Set up Model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, let's import and initialize the model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from progpy.models import BatteryElectroChemEOD\n", "\n", "batt: BatteryElectroChemEOD = BatteryElectroChemEOD()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will also update the `Ro` and `qMobile` parameters to better represent the age of the battery. See __[02 Parameter Estimation](02_Parameter%20Estimation.ipynb)__ for examples on how to estimate model parameters. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batt[\"Ro\"] = 0.15\n", "batt[\"qMobile\"] = 7750" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set up State Estimator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The two basic components of prognostics are [state estimation and prediction](https://nasa.github.io/progpy/prog_algs_guide.html#state-estimation-and-prediction-guide). ProgPy includes functionality to do both. See __[07 State Estimation](07_State%20Estimation.ipynb)__ and __[08 Prediction](08_Prediction.ipynb)__ for examples of this. First, let's setup our state estimator." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from progpy.state_estimators import ParticleFilter\n", "from progpy.uncertain_data import MultivariateNormalDist" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "State estimators require an initial state. To define this, we'll first initialize the model and then define the initial state as a distribution of possible states around this using a multi-variate normal distribution. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "initial_state = batt.initialize() # Initialize model\n", "\n", "# Define distribution around initial state\n", "x_guess = MultivariateNormalDist(\n", " labels=initial_state.keys(),\n", " mean=initial_state.values(),\n", " covar=np.diag([max(1e-9, abs(x)) for x in initial_state.values()]),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With our initial distribution defined, we can now instantiate the state estimator." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pf = ParticleFilter(batt, x_guess)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set up Predictor\n", "\n", "Now that we know how to do state estimation, the next key component of prognostics is [prediction](https://nasa.github.io/progpy/prog_algs_guide.html#prediction). ProgPy includes multiple predictors, and we'll implement a [Monte Carlo](https://nasa.github.io/progpy/api_ref/progpy/Predictor.html?highlight=monte%20carlo#included-predictors) predictor here. Let's load the necessary imports. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from progpy.predictors import MonteCarlo" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, a key factor in modeling any real-world application is noise. See the ProgPy [noise documentation](https://nasa.github.io/progpy/prog_models_guide.html#noise) for a detailed description of different types of noise and how to include it in the ProgPy architecture. Here, let's add some process and measurement noise into our system, to capture any uncertainties. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PROCESS_NOISE = 2e-4 # Percentage process noise\n", "MEASUREMENT_NOISE = 1e-4 # Percentage measurement noise\n", "\n", "# Apply process noise to state\n", "batt.parameters[\"process_noise\"] = {\n", " key: PROCESS_NOISE * value for key, value in initial_state.items()\n", "}\n", "\n", "# Apply measurement noise to output\n", "z0 = batt.output(initial_state)\n", "batt.parameters[\"measurement_noise\"] = {\n", " key: MEASUREMENT_NOISE * value for key, value in z0.items()\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's set up our predictor. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mc = MonteCarlo(batt)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To perform the prediction, we need to specify a few things, including the number of samples we want to use for the prediction, the step size for the prediction, and the prediction horizon (i.e., the time value to predict to)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "NUM_SAMPLES = 100\n", "STEP_SIZE = 1\n", "PREDICTION_HORIZON = random_walk_dataset[\"absoluteTime\"].iloc[-1]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We also need to define a future loading function based on the load in the dataset we are using. Let's extract the necessary information and define a function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Extract time and outputs from data\n", "times_rw = random_walk_dataset[\"absoluteTime\"]\n", "outputs_rw = [{\"v\": elem[1][\"voltage\"]} for elem in random_walk_dataset.iterrows()]\n", "\n", "# Define function\n", "import numpy as np\n", "\n", "\n", "def future_load_rw(t, x=None):\n", " current = np.interp(t, times_rw, random_walk_dataset[\"current\"])\n", " return {\"i\": current}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will also adjust the voltage threshold for the sake of a demo." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batt.parameters[\"VEOD\"] = 3.3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With this, we are ready to predict. Let's pull it all together" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prognostics Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now it's time to put it all together.\n", "\n", "Typically in a fielded system predictions do not occur every time there is a state estimation. Instead, state estimation happens whenever there's new data, and prediction happens at some lower frequency. \n", "\n", "In some cases the update frequency may be in wall clock time, or after every operational period (e.g., flight). Predictions can also be triggered (or made more frequently) by proximity to event or by the output of a diagnoser. \n", "\n", "In this case, we are specifying a certain number of update steps between predictions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PREDICTION_UPDATE_FREQ = 50" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's initialize a data structure for storing the results, using the following built-in class:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from progpy.predictors import ToEPredictionProfile\n", "\n", "profile = ToEPredictionProfile()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll perform prognostics. We'll loop through the playback data, estimating the state at each time step, and making a prediction at the `PREDICTION_UPDATE_FREQ`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Loop through time\n", "for ind in range(3, random_walk_dataset.shape[0]):\n", " # Extract data\n", " t = random_walk_dataset[\"absoluteTime\"][ind]\n", " i = {\"i\": random_walk_dataset[\"current\"][ind]}\n", " z = {\n", " \"t\": random_walk_dataset[\"temperature\"][ind],\n", " \"v\": random_walk_dataset[\"voltage\"][ind],\n", " }\n", "\n", " # Perform state estimation\n", " pf.estimate(t, i, z)\n", " eod = batt.event_state(pf.x.mean)[\"EOD\"]\n", " print(\" - Event State: \", eod)\n", "\n", " # Prediction step (at specified frequency)\n", " if ind % PREDICTION_UPDATE_FREQ == 0:\n", " # Perform prediction\n", " mc_results = mc.predict(\n", " pf.x,\n", " future_load_rw,\n", " t0=t,\n", " n_samples=NUM_SAMPLES,\n", " dt=1,\n", " horizon=PREDICTION_HORIZON,\n", " const_load=True,\n", " )\n", "\n", " # Calculate metrics and print\n", " metrics = mc_results.time_of_event.metrics()\n", " print(\n", " \" - ToE: {} (sigma: {})\".format(\n", " metrics[\"EOD\"][\"mean\"], metrics[\"EOD\"][\"std\"]\n", " )\n", " )\n", "\n", " # Save results\n", " profile.add_prediction(t, mc_results.time_of_event)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is an example with playback data. In a real application, the state estimator would be listening to data from a data stream and would be publishing the results to some consumer (e.g., a data bus or directly updating a dispaly)\n", "\n", "With our prognostics results, we can now calculate some metrics to analyze the accuracy. We'll start by calculating the cumulative relative accuracy given the ground truth value. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "GROUND_TRUTH = {\"EOD\": 1600}\n", "cra = profile.cumulative_relative_accuracy(GROUND_TRUTH)\n", "\n", "print(f\"Cumulative Relative Accuracy for 'EOD': {cra['EOD']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll also generate some plots of the results, given a specific ground truth." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ALPHA = 0.05\n", "playback_plots = profile.plot(GROUND_TRUTH, ALPHA, True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusion\n", "\n", "In this notebook, we were able to demonstrate how to use ProgPy to build an integrated prognostics example with a state estimator and predictor. In the next notebook __[10 Prognostics Server](10_Prognostics%20Server.ipynb)__, we will be exploring how to use the ProgPy server." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.10.7 64-bit", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.10.7" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "610c699f0cd8c4f129acd9140687fff6866bed0eb8e82f249fc8848b827b628c" } } }, "nbformat": 4, "nbformat_minor": 2 }