{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Holoviews 'streams demo'\n", "\n", "#### from https://github.com/ioam/holoviews/blob/master/examples/user_guide/15-Streaming_Data.ipynb\n", "#### (Holoviews/Datashader/Bokeh/Jupyter Notebook)\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# These are examples from Holoviews Developer Philipp Rudiger:\n", "# https://anaconda.org/philippjfr/working_with_streaming_data/notebook\n", "# As of 2017-10-26:\n", "# . this is bleeding-edge,\n", "# . this Notebook is made to check it can work well also on Windows / WinPython.\n", "#\n", "# User may notice we're getting clother to a PyQtGraph style of graphics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "import numpy as np\n", "import pandas as pd\n", "import holoviews as hv\n", "\n", "from holoviews.streams import Pipe, Buffer\n", "\n", "import streamz\n", "import streamz.dataframe\n", "\n", "hv.extension('bokeh')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipe\n", "\n", "A Pipe allows data to be pushed into a DynamicMap callback to change a visualization" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Pipe opening\n", "pipe = Pipe(data=[])\n", "vector_dmap = hv.DynamicMap(hv.VectorField, streams=[pipe])\n", "vector_dmap.redim.range(x=(-1, 1), y=(-1, 1))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Feeding pipe\n", "x,y = np.mgrid[-10:11,-10:11] * 0.1\n", "sine_rings = np.sin(x**2+y**2)*np.pi+np.pi\n", "exp_falloff = 1/np.exp((x**2+y**2)/8)\n", "\n", "for i in np.linspace(0, 1, 25):\n", " time.sleep(0.1)\n", " pipe.send([x,y,sine_rings*i, exp_falloff])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Buffer\n", "Buffer automatically accumulates the last N rows of the tabular data, where N is defined by the length.\n", "\n", "Plotting backends (such as bokeh) can optimize plot updates by sending just the latest patch. This optimization works only if the data object held by the Buffer is identical to the plotted Element data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "example = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count'])\n", "dfstream = Buffer(example, length=100, index=False)\n", "curve_dmap = hv.DynamicMap(hv.Curve, streams=[dfstream])\n", "point_dmap = hv.DynamicMap(hv.Points, streams=[dfstream])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%opts Points [color_index='count', xaxis=None, yaxis=None] (line_color='black', size=5)\n", "%%opts Curve (line_width=1, color='black')\n", "curve_dmap * point_dmap" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def gen_brownian():\n", " x, y, count = 0, 0, 0\n", " while True:\n", " x += np.random.randn()\n", " y += np.random.randn()\n", " count += 1\n", " yield pd.DataFrame([(x, y, count)], columns=['x', 'y', 'count'])\n", "\n", "brownian = gen_brownian()\n", "for i in range(200):\n", " dfstream.send(next(brownian))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dfstream.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Asynchronous updates Using the Streamz library\n", "\n", "Let's start with a fairly simple example:\n", "- Declare a streamz.Stream and a Pipe object and connect them into a pipeline into which we can push data.\n", "- Use a sliding_window of 10, which will first wait for 10 sets of stream updates to accumulate. At that point and for every subsequent update, it will apply pd.concat to combine the most recent 10 updates into a new dataframe.\n", "- Use the sink method on the streamz.Stream to send the resulting collection of 10 updates to Pipe.\n", "- Declare a DynamicMap that takes the sliding window of concatenated DataFrames and displays it using a Scatter Element.\n", "- Color the Scatter points by their 'count' and set a range, then display:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "point_source = streamz.Stream()\n", "pipe = Pipe(data=[])\n", "point_source.sliding_window(20).map(pd.concat).sink(pipe.send) # Connect streamz to the Pipe\n", "scatter_dmap = hv.DynamicMap(hv.Scatter, streams=[pipe])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%opts Scatter [color_index='count', bgcolor='black']\n", "scatter_dmap.redim.range(y=(-4, 4))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i in range(100):\n", " df = pd.DataFrame({'x': np.random.rand(100), 'y': np.random.randn(100), 'count': i},\n", " columns=['x', 'y', 'count'])\n", " point_source.emit(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### StreamingDataFrame" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### A simple example" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "simple_sdf = streamz.dataframe.Random(freq='10ms', interval='100ms')\n", "print(simple_sdf.index)\n", "simple_sdf.example.dtypes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%opts Curve [width=500 show_grid=True]\n", "sdf = (simple_sdf-0.5).cumsum()\n", "hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The ``Random`` StreamingDataFrame will asynchronously emit events until it is stopped, which we can do by calling the ``stop`` method." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "simple_sdf.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Making use of the StreamingDataFrame API" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%opts Curve [width=500 show_grid=True]\n", "source_df = streamz.dataframe.Random(freq='5ms', interval='100ms')\n", "sdf = (source_df-0.5).cumsum()\n", "raw_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)])\n", "smooth_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x.rolling('500ms').mean())])\n", "\n", "raw_dmap.relabel('raw') * smooth_dmap.relabel('smooth')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "source_df.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Controlling the backlog" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from functools import partial\n", "multi_source = streamz.dataframe.Random(freq='5ms', interval='100ms')\n", "sdf = (multi_source-0.5).cumsum()\n", "hv.DynamicMap(hv.Table, streams=[Buffer(sdf.x, length=10)]) +\\\n", "hv.DynamicMap(partial(hv.BoxWhisker, kdims=[], vdims=['x']), streams=[Buffer(sdf.x, length=100)])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Updating multiple cells" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since a ``StreamingDataFrame`` will emit data until it is stopped we can subscribe multiple plots across different cells to the same stream:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hv.DynamicMap(hv.Scatter, streams=[Buffer(sdf.x)])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "multi_source.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Applying operations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hist_source = streamz.dataframe.Random(freq='5ms', interval='100ms')\n", "sdf = (hist_source-0.5).cumsum()\n", "dmap = hv.DynamicMap(hv.Dataset, streams=[Buffer(sdf.x, length=500)])\n", "hv.operation.histogram(dmap, dimension='x')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hist_source.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Datashading" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The same approach will also work for the datashader operation letting us datashade the entire ``backlog`` window even if we make it very large:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%opts RGB [width=600]\n", "from holoviews.operation.datashader import datashade\n", "from bokeh.palettes import Blues8\n", "large_source = streamz.dataframe.Random(freq='100us', interval='200ms')\n", "sdf = (large_source-0.5).cumsum()\n", "dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x, length=100000)])\n", "datashade(dmap, streams=[hv.streams.PlotSize], normalization='linear', cmap=Blues8)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "large_source.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.3" } }, "nbformat": 4, "nbformat_minor": 2 }