{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "# HyperStream Tutorial 3: Stream composition\n", "\n", "We will be ussing the tool created in the previous tutorial and we will compose the output of the stream with a new one." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPython 2.7.6\n", "IPython 5.3.0\n", "\n", "hyperstream 0.3.0-beta\n", "\n", "compiler : GCC 4.8.4\n", "system : Linux\n", "release : 3.19.0-80-generic\n", "machine : x86_64\n", "processor : x86_64\n", "CPU cores : 4\n", "interpreter: 64bit\n", "Git hash : f0e911526041b91fe7999a8968c80618d410e741\n" ] } ], "source": [ "%load_ext watermark\n", "\n", "import sys\n", "from datetime import datetime\n", "\n", "sys.path.append(\"../\") # Add parent dir in the Path\n", "\n", "from hyperstream import HyperStream\n", "from hyperstream import TimeInterval\n", "from hyperstream.utils import UTC\n", "\n", "from utils import plot_high_chart\n", "\n", "%watermark -v -m -p hyperstream -g" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id \n", "[1990-02-01 00:00:00+00:00]: [13.33, 2.15]\n", "[1990-03-01 00:00:00+00:00]: [13.44, 2.71]\n", "[1990-04-01 00:00:00+00:00]: [12.16, 5.1]\n", "[1990-05-01 00:00:00+00:00]: [10.84, 7.37]\n", "[1990-06-01 00:00:00+00:00]: [9.12, 10.26]\n", "[1990-07-01 00:00:00+00:00]: [6.44, 12.17]\n", "[1990-08-01 00:00:00+00:00]: [4.92, 13.95]\n", "[1990-09-01 00:00:00+00:00]: [4.5, 14.3]\n", "[1990-10-01 00:00:00+00:00]: [6.67, 13.71]\n", "[1990-11-01 00:00:00+00:00]: [9.58, 11.24]\n" ] } ], "source": [ "hs = HyperStream(loglevel=20)\n", "print hs\n", "\n", "reader_tool = hs.plugins.example.tools.csv_reader('data/sea_ice.csv')\n", "sea_ice_stream = hs.channel_manager.memory.get_or_create_stream(\"sea_ice\")\n", "\n", "\n", "ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2012, 1, 1).replace(tzinfo=UTC))\n", "\n", "reader_tool.execute(sources=[], sink=sea_ice_stream, interval=ti)\n", "for key, value in sea_ice_stream.window().items()[:10]:\n", " print '[%s]: %s' % (key, value)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Stream composition\n", "\n", "We can compose a chain of streams using different tools to get a new stream. As an example, we can use the tool **read_csv** to generate a stream from a csv file. Then, we can apply the tool **list_mean**, that computes the mean of all the values of each instance of a stream, and outputs a new stream. Finally, we can define the new stream to store the output in **memory** or in a **MongoDB** database. In this case, we will store the final Stream in the MongoDB database.\n", "\n", "|~stream||tool||stream||tool||stream|\n", "|:-:|:-:|:-:|:-:|:-:|:-:|:-:|:-:|:-:|\n", "| csv_file | $\\rightarrow$ | reader_tool | $\\rightarrow$ | sea_ice_stream | $\\rightarrow$ | list_mean_tool | $\\rightarrow$ | sea_ice_mean_stream |\n", "|filesystem||memory||memory||memory||MongoDB|" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1990-02-01 00:00:00+00:00]: 7.74\n", "[1990-03-01 00:00:00+00:00]: 8.075\n", "[1990-04-01 00:00:00+00:00]: 8.63\n", "[1990-05-01 00:00:00+00:00]: 9.105\n", "[1990-06-01 00:00:00+00:00]: 9.69\n", "[1990-07-01 00:00:00+00:00]: 9.305\n", "[1990-08-01 00:00:00+00:00]: 9.435\n", "[1990-09-01 00:00:00+00:00]: 9.4\n", "[1990-10-01 00:00:00+00:00]: 10.19\n", "[1990-11-01 00:00:00+00:00]: 10.41\n" ] } ], "source": [ "list_mean_tool = hs.tools.list_mean()\n", "\n", "sea_ice_means_stream = hs.channel_manager.mongo.get_or_create_stream('sea_ice_means')\n", "list_mean_tool.execute(sources=[sea_ice_stream], sink=sea_ice_means_stream, interval=ti)\n", "\n", "for key, value in sea_ice_means_stream.window().items()[:10]:\n", " print '[%s]: %s' % (key, value)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualization\n", "\n", "We can now plot all the values of the last computed window. In this case there is only one window with all the data computed by the tool." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Mean of sea levels in the Artic and the Antartica\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "my_time, my_data = zip(*[(key.__str__(), value) for key, value in sea_ice_means_stream.window().items()])\n", "\n", "plot_high_chart(my_time, my_data, type=\"high_stock\", \n", " title='Mean of sea levels in the Artic and the Antartica', yax='meters')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 1 }