{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\nExample building a full model with events and thresholds using LSTMStateTransitionModel. \n\nIn this example, we generate fake data using the ThrownObject model. This is a case where we're generating a surrogate model from the physics-based model. For cases where you're generating a model from data (e.g., collected from a testbed or a real-world environment), you'll replace that generated data with your own. \n\nWe then create a subclass of the LSTMStateTransitionModel, defining the event_state and threshold equations as a function of output. We use the generated model and compare to the original model.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\nimport numpy as np\n\nfrom prog_models.data_models import LSTMStateTransitionModel\nfrom prog_models.models import ThrownObject\n\ndef run_example():\n # -----------------------------------------------------\n # Method 1 - manual definition\n # In this example we complete the models by manually defining event_state \n # and thresholds_met as function of output.\n # -----------------------------------------------------\n TIMESTEP = 0.01\n m = ThrownObject()\n def future_loading(t, x=None):\n return m.InputContainer({}) # No input for thrown object \n\n # Step 1: Generate additional data\n # We will use data generated above, but we also want data at additional timesteps \n print('Generating data...')\n data = m.simulate_to_threshold(future_loading, threshold_keys='impact', save_freq=TIMESTEP, dt=TIMESTEP)\n data_half = m.simulate_to_threshold(future_loading, threshold_keys='impact', save_freq=TIMESTEP/2, dt=TIMESTEP/2)\n data_quarter = m.simulate_to_threshold(future_loading, threshold_keys='impact', save_freq=TIMESTEP/4, dt=TIMESTEP/4)\n data_twice = m.simulate_to_threshold(future_loading, threshold_keys='impact', save_freq=TIMESTEP*2, dt=TIMESTEP*2)\n data_four = m.simulate_to_threshold(future_loading, threshold_keys='impact', save_freq=TIMESTEP*4, dt=TIMESTEP*4)\n\n # Step 2: Data Prep\n # We need to add the timestep as a input\n u = np.array([[TIMESTEP] for _ in data.inputs])\n u_half = np.array([[TIMESTEP/2] for _ in data_half.inputs])\n u_quarter = np.array([[TIMESTEP/4] for _ in data_quarter.inputs])\n u_twice = np.array([[TIMESTEP*2] for _ in data_twice.inputs])\n u_four = np.array([[TIMESTEP*4] for _ in data_four.inputs])\n\n # In this case we are saying that velocity is directly measurable, \n # unlike the original model. This is necessary to calculate the events.\n # Since the outputs will then match the states, we pass in the states below\n\n u_data = [u, u_half, u_quarter, u_twice, u_four]\n z_data = [data.states, data_half.states, data_quarter.states, data_twice.states, data_four.states]\n\n # Step 3: Create model\n print('Creating model...')\n\n # Create a subclass of LSTMStateTransitionModel, \n # overridding event-related methods and members\n class LSTMThrownObject(LSTMStateTransitionModel):\n events = [\n 'falling', # Event- object is falling\n 'impact' # Event- object has impacted ground\n ]\n\n def initialize(self, u=None, z=None):\n # Add logic required for thrown object\n self.max_x = 0.0\n return super().initialize(u, z)\n\n def event_state(self, x):\n # Using class name instead of self allows the class to be subclassed\n z = LSTMThrownObject.output(self, x)\n # Logic from ThrownObject.event_state, using output instead of state\n self.max_x = max(self.max_x, z['x']) # Maximum altitude\n return {\n 'falling': max(z['v']/self.parameters['throwing_speed'],0), # Throwing speed is max speed\n 'impact': max(z['x']/self.max_x,0) # 1 until falling begins, then it's fraction of height\n }\n\n def threshold_met(self, x):\n z = LSTMThrownObject.output(self, x)\n # Logic from ThrownObject.threshold_met, using output instead of state\n return {\n 'falling': z['v'] < 0,\n 'impact': z['x'] <= 0\n }\n \n # Step 4: Generate Model\n print('Building model...')\n m2 = LSTMThrownObject.from_data(\n inputs=u_data, \n outputs=z_data,\n window=4, \n epochs=30, \n input_keys = ['dt'],\n output_keys = m.states)\n\n # Step 5: Simulate with model\n t_counter = 0\n x_counter = m.initialize()\n def future_loading3(t, x = None):\n nonlocal t_counter, x_counter\n z = m2.InputContainer({'x_t-1': x_counter['x'], 'v_t-1': x_counter['v'], 'dt': t - t_counter})\n x_counter = m.next_state(x_counter, future_loading(t), t - t_counter)\n t_counter = t\n return z\n\n # Use new dt, not used in training\n # Using a dt not used in training will demonstrate the model's \n # ability to handle different timesteps not part of training set\n data = m.simulate_to_threshold(future_loading, threshold_keys='impact', dt=TIMESTEP*3, save_freq=TIMESTEP*3)\n results3 = m2.simulate_to_threshold(future_loading3, threshold_keys='impact', dt=TIMESTEP*3, save_freq=TIMESTEP*3)\n\n # Step 6: Compare Results\n print('Comparing results...')\n print('Predicted impact time:')\n print('\\tOriginal: ', data.times[-1])\n print('\\tLSTM: ', results3.times[-1])\n data.outputs.plot(title='original model')\n results3.outputs.plot(title='generated model')\n plt.show()\n\nif __name__ == '__main__':\n run_example()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.9" } }, "nbformat": 4, "nbformat_minor": 0 }