{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Working with Streaming Data\n", "\n", "\"Streaming data\" is data that is continuously generated, often by some external source like a remote website, a measuring device, or a simulator. This kind of data is common for financial time series, web server logs, scientific applications, and many other situations. We have seen how to visualize any data output by a callable in the [Live Data](07-Live_Data.ipynb) user guide and we have also seen how to use the HoloViews stream system to push events in the user guide sections [Responding to Events](12-Responding_to_Events.ipynb) and [Custom Interactivity](13-Custom_Interactivity.ipynb).\n", "\n", "This user guide shows a third way of building an interactive plot, using ``DynamicMap`` and streams. Here, instead of pushing plot metadata (such as zoom ranges, user triggered events such as ``Tap`` and so on) to a ``DynamicMap`` callback, the underlying data in the visualized elements are updated directly using a HoloViews ``Stream``.\n", "\n", "In particular, we will show how the HoloViews ``Pipe`` and ``Buffer`` streams can be used to work with streaming data sources without having to fetch or generate the data from inside the ``DynamicMap`` callable." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "\n", "import numpy as np\n", "import pandas as pd\n", "\n", "import holoviews as hv\n", "from holoviews import opts\n", "from holoviews.streams import Buffer, Pipe\n", "\n", "hv.extension('bokeh')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ``Pipe``\n", "\n", "A ``Pipe`` allows data to be pushed into a DynamicMap callback to change a visualization, just like the streams in the [Responding to Events](./12-Responding_to_Events.ipynb) user guide were used to push changes to metadata that controlled the visualization. A ``Pipe`` can be used to push data of any type and make it available to a ``DynamicMap`` callback. Since all ``Element`` types accept ``data`` of various forms we can use ``Pipe`` to push data directly to the constructor of an ``Element`` through a DynamicMap.\n", "\n", "\n", "We can take advantage of the fact that most Elements can be instantiated without providing any data, so we declare the the ``Pipe`` with an empty list, declare the ``DynamicMap``, providing the pipe as a stream, which will dynamically update a ``VectorField`` :" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipe = Pipe(data=[])\n", "vector_dmap = hv.DynamicMap(hv.VectorField, streams=[pipe])\n", "vector_dmap.opts(color='Magnitude', xlim=(-1, 1), ylim=(-1, 1))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Having set up this ``VectorField`` tied to a ``Pipe`` we can start pushing data to it varying the orientation of the VectorField:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x,y = np.mgrid[-10:11,-10:11] * 0.1\n", "sine_rings = np.sin(x**2+y**2)*np.pi+np.pi\n", "exp_falloff = 1/np.exp((x**2+y**2)/8)\n", "\n", "for i in np.linspace(0, 1, 25):\n", " time.sleep(0.1)\n", " pipe.send((x,y,sine_rings*i, exp_falloff))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This approach of using an element constructor directly does not allow you to use anything other than the default key and value dimensions. One simple workaround for this limitation is to use ``functools.partial`` as demonstrated in the **Controlling the length section** below.\n", "\n", "Since ``Pipe`` is completely general and the data can be any custom type, it provides a completely general mechanism to stream structured or unstructured data. Due to this generality, ``Pipe`` does not offer some of the more complex features and optimizations available when using the ``Buffer`` stream described in the next section." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ``Buffer``\n", "\n", "While ``Pipe`` provides a general solution for piping arbitrary data to ``DynamicMap`` callback, ``Buffer`` on the other hand provides a very powerful means of working with streaming tabular data, defined as pandas dataframes, arrays or dictionaries of columns (as well as StreamingDataFrame, which we will cover later). ``Buffer`` automatically accumulates the last ``N`` rows of the tabular data, where ``N`` is defined by the ``length``.\n", "\n", "The ability to accumulate data allows performing operations on a recent history of data, while plotting backends (such as bokeh) can optimize plot updates by sending just the latest patch. This optimization works only if the ``data`` object held by the ``Buffer`` is identical to the plotted ``Element`` data, otherwise all the data will be updated as normal." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### A simple example: Brownian motion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To initialize a ``Buffer`` we have to provide an example dataset which defines the columns and dtypes of the data we will be streaming. Next we define the ``length`` to keep the last 100 rows of data. If the data is a DataFrame we can specify whether we will also want to use the ``DataFrame`` ``index``. In this case we will simply define that we want to plot a ``DataFrame`` of 'x' and 'y' positions and a 'count' as ``Points`` and ``Curve`` elements:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "example = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count'])\n", "dfstream = Buffer(example, length=100, index=False)\n", "curve_dmap = hv.DynamicMap(hv.Curve, streams=[dfstream])\n", "point_dmap = hv.DynamicMap(hv.Points, streams=[dfstream])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After applying some styling we will display an ``Overlay`` of the dynamic ``Curve`` and ``Points``" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "(curve_dmap * point_dmap).opts(\n", " opts.Points(color='count', line_color='black', size=5, padding=0.1, xaxis=None, yaxis=None),\n", " opts.Curve(line_width=1, color='black'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have set up the ``Buffer`` and defined a ``DynamicMap`` to plot the data we can start pushing data to it. We will define a simple function which simulates brownian motion by accumulating x, y positions. We can ``send`` data through the ``hv.streams.Buffer`` directly." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def gen_brownian():\n", " x, y, count = 0, 0, 0\n", " while True:\n", " x += np.random.randn()\n", " y += np.random.randn()\n", " count += 1\n", " yield pd.DataFrame([(x, y, count)], columns=['x', 'y', 'count'])\n", "\n", "brownian = gen_brownian()\n", "for _ in range(200):\n", " dfstream.send(next(brownian))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally we can clear the data on the stream and plot using the ``clear`` method:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dfstream.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that when using the ``Buffer`` stream the view will always follow the current range of the data by default, by setting ``buffer.following=False`` or passing following as an argument to the constructor this behavior may be disabled." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Asynchronous updates using asyncio\n", "\n", "In most cases, instead of pushing updates manually from the same Python process, you'll want the object to update asynchronously as new data arrives. Since both Jupyter and Bokeh server run on an `asyncio` event-loop in both cases to define a non-blocking co-routine that can push data to our stream whenever it is ready. We can define an asynchronous functionwith a `asyncio.sleep` timeout and schedule it as a task. Once we have declared the callback we can call ``start`` to begin emitting events:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "\n", "count = 0\n", "buffer = Buffer(np.zeros((0, 2)), length=50)\n", "\n", "async def f():\n", " global count\n", " while True:\n", " await asyncio.sleep(0.1)\n", " count += 1\n", " buffer.send(np.array([[count, np.random.rand()]]))\n", "\n", "task = asyncio.create_task(f())\n", "\n", "hv.DynamicMap(hv.Curve, streams=[buffer]).opts(padding=0.1, width=600)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since the callback is non-blocking we can continue working in the notebook and execute other cells. Once we're done we can stop the callback by calling ``cb.stop()``." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "task.cancel()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Real examples\n", "\n", "Using the ``Pipe`` and ``Buffer`` streams we can create complex streaming plots very easily. In addition to the toy examples we presented in this guide it is worth looking at looking at some of the examples using real, live, streaming data.\n", "\n", "* The [streaming_psutil](https://holoviews.org/gallery/apps/bokeh/streaming_psutil.html) bokeh app is one such example which display CPU and memory information using the ``psutil`` library (install with ``pip install psutil`` or ``conda install psutil``)\n", "\n", "\n", "\n", "As you can see, streaming data works like streams in HoloViews in general, flexibly handling changes over time under either explicit control or governed by some external data source." ] } ], "metadata": { "language_info": { "name": "python", "pygments_lexer": "ipython3" } }, "nbformat": 4, "nbformat_minor": 4 }