{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Integrated Prognostics Example\n", "\n", "This is an integrated example of Prognostics with ProgPy. This example is based on the prognostics example from the Tutorial at the 2024 PHM Society Conference." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data preparation\n", "First, we need to download the data we will use in this chapter. To do this we use the datasets subpackage in progpy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from progpy.datasets import nasa_battery\n", "(desc, data) = nasa_battery.load_data(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note, this downloads the battery data from the PCoE datasets:\n", "https://www.nasa.gov/intelligent-systems-division/discovery-and-systems-health/pcoe/pcoe-data-set-repository/ \n", "\n", "Let's prepare the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(desc['description'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dataset includes 4 different kinds of runs: trickle, step, reference, random walk. For this example we will use the trickle dataset. \n", "\n", "The dataset includes 4 columns: relativeTime, current, voltage, and temperature. relativeTime is the time in a specific \"run\" (i.e., with one current draw). To use the random walk dataset, we need to concatenate multiple runs. To support this, we add a new column, absoluteTime, which shows time in the dataset (instead of run)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data[35]['absoluteTime'] = data[35]['relativeTime']\n", "for i in range(36, 50):\n", " data[i]['absoluteTime'] = data[i]['relativeTime'] + data[i-1]['absoluteTime'].iloc[-1]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we combine the data into a single dataset and investigate the results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "random_walk_dataset = pd.concat(data[35:50], ignore_index=True)\n", "print(random_walk_dataset)\n", "random_walk_dataset.plot(y=['current', 'voltage', 'temperature'], subplots=True, xlabel='Time (sec)')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now the data is ready for this tutorial, let's dive into it." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setting up for Prognostics" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To illustrate how to do prognostics, let's use the [Battery Electrochemistry model](https://nasa.github.io/progpy/api_ref/progpy/IncludedModels.html#:~:text=class%20progpy.models.BatteryElectroChemEOD(**kwargs)). This model predicts the end-of-discharge of a Lithium-ion battery based on a set of differential equations that describe the electrochemistry of the system [Daigle et al. 2013](https://papers.phmsociety.org/index.php/phmconf/article/view/2252).\n", "\n", "First, lets setup the model.\n", "\n", "### Setup Model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from progpy.models import BatteryElectroChemEOD\n", "batt: BatteryElectroChemEOD = BatteryElectroChemEOD()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will also update the Ro and qMobile parameters to better represent the age of the battery. See 02. Parameter Estimation notebook for examples on how to estimate model parameters. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batt['Ro'] = 0.15\n", "batt['qMobile'] = 7750" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The two basic components of prognostics are [state estimation and prediction](https://nasa.github.io/progpy/prog_algs_guide.html#state-estimation-and-prediction-guide). ProgPy includes functionality to do both. See 07. State Estimation and 08. Prediction for examples of this.\n", "\n", "First, let's setup our state estimator\n", "### Setup State Estimator" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from progpy.state_estimators import ParticleFilter\n", "from progpy.uncertain_data import MultivariateNormalDist" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "State estimators require an initial state. To define this, we'll first initialize the model and then define the initial state as a distribution of possible states around this using a multi-variate normal distribution. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "initial_state = batt.initialize() # Initialize model\n", "# Define distribution around initial state\n", "x_guess = MultivariateNormalDist(\n", " labels=initial_state.keys(),\n", " mean=initial_state.values(),\n", " covar=np.diag([max(1e-9, abs(x)) for x in initial_state.values()])\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With our initial distribution defined, we can now instantiate the state estimator." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pf = ParticleFilter(batt, x_guess)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we should setup our predictor\n", "\n", "### Setup Predictor\n", "\n", "Now that we know how to do state estimation, the next key component of prognostics is [prediction](https://nasa.github.io/progpy/prog_algs_guide.html#prediction). ProgPy includes multiple predictors, and we'll implement a [Monte Carlo](https://nasa.github.io/progpy/api_ref/progpy/Predictor.html?highlight=monte%20carlo#included-predictors) predictor here. Let's load the necessary imports. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from progpy.predictors import MonteCarlo" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, a key factor in modeling any real-world application is noise. See the ProgPy [noise documentation](https://nasa.github.io/progpy/prog_models_guide.html#noise) for a detailed description of different types of noise and how to include it in the ProgPy architecture. Here, let's add some process and measurement noise into our system, to capture any uncertainties. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PROCESS_NOISE = 2e-4 # Percentage process noise\n", "MEASUREMENT_NOISE = 1e-4 # Percentage measurement noise\n", "\n", "# Apply process noise to state\n", "batt.parameters['process_noise'] = {key: PROCESS_NOISE * value for key, value in initial_state.items()}\n", "\n", "# Apply measurement noise to output\n", "z0 = batt.output(initial_state)\n", "batt.parameters['measurement_noise'] = {key: MEASUREMENT_NOISE * value for key, value in z0.items()}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's set up our predictor. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mc = MonteCarlo(batt)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To perform the prediction, we need to specify a few things, including the number of samples we want to use for the prediction, the step size for the prediction, and the prediction horizon (i.e., the time value to predict to)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "NUM_SAMPLES = 100\n", "STEP_SIZE = 1\n", "PREDICTION_HORIZON = random_walk_dataset['absoluteTime'].iloc[-1] " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We also need to define a future loading function based on the load in the dataset we are using. Let's extract the necessary information and define a function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Extract time and outputs from data\n", "times_rw = random_walk_dataset['absoluteTime']\n", "outputs_rw = [{'v': elem[1]['voltage']} for elem in random_walk_dataset.iterrows()]\n", "\n", "# Define function\n", "import numpy as np\n", "def future_load_rw(t, x=None):\n", " current = np.interp(t, times_rw, random_walk_dataset['current'])\n", " return {'i': current}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will also adjust the voltage threshold for the sake of a demo." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batt.parameters['VEOD'] = 3.3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With this, we are ready to predict. Let's pull it all together" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Putting it together- Prognostics Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now it's time to put it all together.\n", "\n", "Typically in a fielded system predictions do not occur every time there is a state estimation. Instead, state estimation happens whenever there's new data, and prediction happens at some lower frequency. \n", "\n", "In some cases the update frequency may be in wall clock time, or after every operational period (e.g., flight). Predictions can also be triggered (or made more frequently) by proximity to event or by the output of a diagnoser. \n", "\n", "In this case we are specifying a certain number of update steps between predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PREDICTION_UPDATE_FREQ = 50" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's initialize a data structure for storing the results, using the following built-in class:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from progpy.predictors import ToEPredictionProfile\n", "profile = ToEPredictionProfile()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll perform prognostics. We'll loop through the playback data, estimating the state at each time step, and making a prediction at the `PREDICTION_UPDATE_FREQ`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Loop through time\n", "for ind in range(3, random_walk_dataset.shape[0]):\n", " # Extract data\n", " t = random_walk_dataset['absoluteTime'][ind]\n", " i = {'i': random_walk_dataset['current'][ind]}\n", " z = {'t': random_walk_dataset['temperature'][ind], 'v': random_walk_dataset['voltage'][ind]}\n", "\n", " # Perform state estimation \n", " pf.estimate(t, i, z)\n", " eod = batt.event_state(pf.x.mean)['EOD']\n", " print(\" - Event State: \", eod)\n", "\n", " # Prediction step (at specified frequency)\n", " if (ind%PREDICTION_UPDATE_FREQ == 0):\n", " # Perform prediction\n", " mc_results = mc.predict(pf.x, future_load_rw, t0 = t, n_samples=NUM_SAMPLES, dt=1, horizon=PREDICTION_HORIZON, const_load=True)\n", " \n", " # Calculate metrics and print\n", " metrics = mc_results.time_of_event.metrics()\n", " print(' - ToE: {} (sigma: {})'.format(metrics['EOD']['mean'], metrics['EOD']['std']))\n", "\n", " # Save results\n", " profile.add_prediction(t, mc_results.time_of_event)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is an example with playback data. In a real application, the state estimator would be listening to data from a data stream and would be publishing the results to some consumer (e.g., a data bus or directly updating a dispaly)\n", "\n", "With our prognostics results, we can now calculate some metrics to analyze the accuracy.\n", "\n", "We'll start by calculating the cumulative relative accuracy given the ground truth value. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "GROUND_TRUTH= {'EOD': 1600} \n", "cra = profile.cumulative_relative_accuracy(GROUND_TRUTH)\n", "print(f\"Cumulative Relative Accuracy for 'EOD': {cra['EOD']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll also generate some plots of the results, given a specific ground truth" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ALPHA = 0.05\n", "playback_plots = profile.plot(GROUND_TRUTH, ALPHA, True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusions\n", "\n", "**TODO**" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.10.7 64-bit", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.10.7" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "610c699f0cd8c4f129acd9140687fff6866bed0eb8e82f249fc8848b827b628c" } } }, "nbformat": 4, "nbformat_minor": 2 }