{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# \"We'll Cross the Streams\": Combining Asynchronous Data Streams\n", "\n", "![Ghostbusters reference](https://media.giphy.com/media/3o72EWUgbRNfLegO1W/giphy.gif)\n", "\n", "![Ghostbusters reference](https://thumbs.gfycat.com/ThirstyEnchantedCaudata-size_restricted.gif)\n", "\n", "In this notebook you will:\n", "\n", "* Acquires multiple asynchronous \"streams\" of data.\n", "* Align them based on time and perform basic computations, like normalization.\n", "\n", "Recommended Prerequisites:\n", "\n", "* [Process Tabular Data with Pandas](./Process%20Tabular%20Data%20with%20Pandas.ipynb)\n", "\n", "## Configuration\n", "Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs should already be running in the background. Run this command to verify that they are running: it should produce output with RUNNING on each line. In the event of a problem, edit this command to replace status with restart all and run again.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!supervisorctl -c supervisor/supervisord.conf status" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run scripts/beamline_configuration.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Monitor current asynchronously while counting the detector." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\"Monitoring\" is requesting updates from the device whenever something changes beyond some tolerance (\"deadband\"). This is different than the plans we have encountered so far because bluesky leaves the update rate up to the device, so the number of readings is not fixed or necessarily repeatable from one run to the next.\n", "\n", "Monitoring can be done on a one-off basis, but it's typically set up in a semi-permanent way: you want to \"set it and forget it.\" The ``sd`` object keeps track of things to monitor concurrently with other measurements. We'll add the beam current signal ``I`` to that list." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sd" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sd.monitors.append(I)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now while we scan a motor and read a detector, ``I`` will be monitored in the background." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "RE(scan([slit], motor_slit, -15, 15, 150))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the data as before. The dataset is large, so we'll use the `.head()` method to show just the first several rows." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "header = db[-1]\n", "header.table().head() # shows the 'primary' stream by default" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "header.table(stream_name='primary').head() # equivalent to the above" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What other streams are there?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "header.stream_names" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "See the figure on [this documentation page](https://nsls-ii.github.io/architecture-overview.html) to visualize \"streams.\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "header.table('I_monitor').head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Plot data streams together\n", "\n", "We can plot them each against time." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pandas.plotting import register_matplotlib_converters\n", "register_matplotlib_converters()\n", "plt.figure()\n", "plt.plot('time', 'slit_det', data=header.table(), marker='o', label='slit_det')\n", "plt.plot('time', 'I', data=header.table(stream_name='I_monitor'), marker='x', label='I')\n", "plt.legend()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We cannot plot ``slit_det`` vs ``I`` or normalize ``slit_det`` by ``I`` because the two were never measured at exactly the same time. We'll have to interpolate, downsample, or some combination to get the two measurements into one unified time series." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## \"Muxing\" (combining into one time series) the streams" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To start, we'll use a pandas function for concatenating the tables side by side. The result is a sort of \"block matrix\" of missing data (NaN)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "data = pd.concat([header.table('primary'), header.table('I_monitor')], axis=0, sort=False)\n", "data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Make 'time' the index and sort on it.\n", "sorted_data = data.set_index('time').sort_index()\n", "sorted_data.head(20)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Crude but conceptually simple: interpolate everything with \"forward-fill\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ffilled_data = sorted_data.ffill()\n", "ffilled_data.head(20)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ffilled_data['normalized'] = ffilled_data['slit_det'] / ffilled_data['I'] * ffilled_data['I'].mean()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plt.figure()\n", "plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')\n", "plt.plot('motor_slit', 'normalized', data=ffilled_data, label='interpolated and normalized')\n", "plt.legend()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### More accurate: Use linear interpolation instead of \"forward-fill\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interp_data = sorted_data.interpolate('linear')\n", "interp_data['normalized'] = interp_data['slit_det'] / interp_data['I'] * interp_data['I'].mean()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plt.figure()\n", "plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')\n", "plt.plot('motor_slit', 'normalized', data=ffilled_data, label='ffill')\n", "plt.plot('motor_slit', 'normalized', data=interp_data, label='linear')\n", "plt.legend()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### More accurate: First down-sample current with a \"rolling\" (windowed) mean" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm_I = header.table('I_monitor').set_index('time')['I'].rolling(window=3).mean()\n", "sm_data = pd.concat([header.table('primary').set_index('time'), pd.DataFrame({'I': sm_I})], axis=0, sort=False)\n", "sorted_sm_data = sm_data.sort_index()\n", "interp_sm_data = sorted_sm_data.interpolate('linear')\n", "interp_sm_data['normalized'] = interp_sm_data['slit_det'] / interp_sm_data['I'] * interp_sm_data['I'].mean()\n", "\n", "plt.figure()\n", "plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')\n", "plt.plot('motor_slit', 'normalized', data=ffilled_data, label='ffill')\n", "plt.plot('motor_slit', 'normalized', data=interp_data, label='linear')\n", "plt.plot('motor_slit', 'normalized', data=interp_sm_data, label='downsampled + linear')\n", "plt.legend()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Histogram of time-spacing between current readings." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "header.table('I_monitor')['time']\n", "current_reading_times = header.table('I_monitor')['time']\n", "delta = current_reading_times.diff()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "delta.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "delta.dropna().head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Matplotlib can't plot times. We must convert to integers (units of nanoseconds)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plt.figure()\n", "plt.hist(delta.dropna().astype(int))\n", "plt.xlabel('nanoseconds between readings')\n", "plt.ylabel('counts')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercises" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Q1. Pandas Dataframes have a ``.diff()`` method, which computes the pairwise difference of rows. Example:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.DataFrame({'a': [1, 2, 4, 8]})\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.diff()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use this (and something else you have already seen...) to compute the average time-spacing of current readings." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Type you answer here. Fill in the blanks.\n", "# current_reading_times = header.table('I_monitor')['time']\n", "# current_reading_times._____._____" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load solutions/compute_mean_period.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Q2. Above we used the ``interpolate`` method. It has several optional parameters. Use ``sorted_data.interpolate?`` to pull up the documentation and explore the options." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Type it here." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }