{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "# HyperStream Tutorial 5: Workflows\n", "\n", "Workflows define a graph of streams. Usually, the first stream will be a special \"raw\" stream that pulls in data from a custom data source. Workflows can have multiple time ranges, which will cause the streams to be computed on all of the ranges given.\n", "\n", "## Introduction\n", "\n", "In this tutorial, we will be ussing a time-series dataset about the temperature in different countries and cities. The dataset is availabel at [The Census at School New Zeland][1]. The necessary files for this tutorial are already included in the folder **data/TimeSeriesDatasets_130207**.\n", "\n", "In particular, there are four files with the minimum and maximum temperatures in different cities of Asia, Australia, NZ and USA from 2000 to 2012. And the rainfall levels of New Zeland. \n", "\n", "![workflows](images/workflow_world_temp.svg)\n", "\n", "[1]: http://new.censusatschool.org.nz/resource/time-series-data-sets-2013/" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPython 2.7.6\n", "IPython 5.3.0\n", "\n", "hyperstream 0.3.0-beta\n", "\n", "compiler : GCC 4.8.4\n", "system : Linux\n", "release : 3.19.0-80-generic\n", "machine : x86_64\n", "processor : x86_64\n", "CPU cores : 4\n", "interpreter: 64bit\n", "Git hash : f0e911526041b91fe7999a8968c80618d410e741\n", "HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id \n" ] } ], "source": [ "%load_ext watermark\n", "\n", "import sys\n", "sys.path.append(\"../\") # Add parent dir in the Path\n", "\n", "from hyperstream import HyperStream\n", "from hyperstream import TimeInterval\n", "from hyperstream.utils import UTC\n", "\n", "from datetime import datetime\n", "from utils import plot_high_chart\n", "from utils import plot_multiple_stock\n", "from dateutil.parser import parse\n", "\n", "%watermark -v -m -p hyperstream -g\n", "\n", "hs = HyperStream(loglevel=20)\n", "print hs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reading the data\n", "\n", "In the data folder there are four csv files with the names **TempAsia.csv, TempAustralia.csv, TempNZ.csv and TempUSA.csv**. The first column of each csv file contains a header with the names of the columns. The first one being the date and the following are the minimum and maximum temperature in different cities with the format **cityMin** and **cityMax**.\n", "\n", "Here is an example of the first 5 rows of the **TempAsia.csv** file:\n", "\n", "```\n", "Date,TokyoMax,TokyoMin,BangkokMax,BangkokMin\n", "2000M01,11.2,4.2,32.8,24\n", "```\n", "\n", "The format of the date has the form **YYYYMmm** where **YYYY** is the year and **mm** is the month. Because this format is not recognized by the default parser of the **csv_reader** tool, we will need to specify our own parser that first replaces the **M** by an hyphen **-** and then applies the **dateutils.parser**.\n", "\n", "Then, we will use a tool to read each csv, and a Stream to store all the results of applying the tool. When we specify to the tool that there is a header row in the csv file, the value of each Stream instance will be a dictionary with the name of the column and its corresponding value. For example, a Stream instance with the 4 cities shown above will look like:\n", "\n", "```\n", "[2000-01-19 00:00:00+00:00]: {'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2}\n", "```" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def dateparser(dt):\n", " return parse(dt.replace('M', '-')).replace(tzinfo=UTC)\n", "\n", "ti_all = TimeInterval(datetime(1999, 1, 1).replace(tzinfo=UTC),\n", " datetime(2013, 1, 1).replace(tzinfo=UTC))\n", "ti_sample = TimeInterval(datetime(2007, 1, 1).replace(tzinfo=UTC),\n", " datetime(2007, 3, 1).replace(tzinfo=UTC))\n", "\n", "# M will be the Memory Channel\n", "M = hs.channel_manager.memory\n", "\n", "countries = ['Asia', 'Australia', 'NZ', 'USA']\n", "temp_tools_csv = {}\n", "temp_streams = {}\n", "for country in countries:\n", " temp_tools_csv[country] = hs.plugins.example.tools.csv_reader(\n", " 'data/TimeSeriesDatasets_130207/Temp{}.csv'.format(country),\n", " header=True, dateparser=dateparser)\n", " temp_streams[country] = M.get_or_create_stream(country)\n", " temp_tools_csv[country].execute(sources=[], sink=temp_streams[country],\n", " interval=ti_all)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Print one Stream Instance per Stream\n", "\n", "Now that we have generated one Stream per each country, we can inspect the first Stream Instance of each Stream." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "Asia: First Stream Instance\n", "[2000-01-21 00:00:00+00:00]: {'NewDelhiMax': 20.1, 'NewDelhiMin': 8.1, 'HongKongMin': 14.3, 'KualaLumpurMin': 23.5, 'TokyoMax': 11.2, 'KualaLumpurMax': 32.2, 'HongKongMax': 19.5, 'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2}\n", "\n", "Australia: First Stream Instance\n", "[2000-01-21 00:00:00+00:00]: {'BrisbaneMax': 28.2, 'MelbourneMin': 15.9, 'Melbournemax': 24.3, 'BrisbaneMin': 19.1, 'CanberraMin': 10.1, 'GoldCoastMax': 27.5, 'SydneyMax': 24.9, 'Canberramax': 24.5, 'GoldCoastMin': 20.2, 'SydneyMin': 17.7}\n", "\n", "NZ: First Stream Instance\n", "[2000-01-21 00:00:00+00:00]: {'WellingtonMin': 14.2, 'ChristchurchMin': 10.8, 'HamiltonMin': 12.4, 'DunedinMax': 18.2, 'WellingtonMax': 20.0, 'ChristchurchMax': 20.2, 'AucklandMax': 23.4, 'HamiltonMax': 23.8, 'DunedinMin': 8.8, 'AucklandMin': 15.5}\n", "\n", "USA: First Stream Instance\n", "[2000-01-21 00:00:00+00:00]: {'ChicagoMax': 1.8, 'LosAngelesMin': 10.0, 'HoustonMax': 21.6, 'NYMax': 4.6, 'SeattleMax': 7.9, 'SeattleMin': 1.4, 'ChicagoMin': -8.1, 'NYMin': -5.6, 'HoustonMin': 7.4, 'LosAngelesMax': 19.6}\n" ] } ], "source": [ "for country in countries:\n", " # Print two examples per stream\n", " print('\\n{}: First Stream Instance'.format(country))\n", " key, value = temp_streams[country].window().first()\n", " print '[%s]: %s' % (key, value)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualize the temperatures in one Country\n", "\n", "Now, we can visualize the temperatures of all the cities in one country. First, we will create a list of all the cities in one of the Streams by looking at the first Stream Instance. Then, we will create a list of lists containing the temperature value of each city, together with their corresponding time. Then, we can use the function **plot_multiple_stock** created for this tutorial." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false, "scrolled": false }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Temperatures in Asia\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "country = countries[0]\n", "this_cities_list = [key for key, value in temp_streams[country].window().items()[0].value.iteritems()]\n", "\n", "data = {city:[] for city in this_cities_list}\n", "time = []\n", "for key, values in temp_streams[country].window().items():\n", " time.append(str(key))\n", " for city, temperature in values.iteritems():\n", " data[city].append(temperature)\n", " \n", "names = data.keys()\n", "data = [value for key, value in data.iteritems()]\n", " \n", "plot_multiple_stock(data, time=time, names=names, title='Temperatures in ' + country, ylabel='ºC')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[StreamInstance(timestamp=datetime.datetime(2013, 1, 1, 0, 0, tzinfo=), value={u'NewDelhiMax': u'NewDelhiMax', u'NewDelhiMin': u'NewDelhiMin', u'HongKongMin': u'HongKongMin', u'KualaLumpurMin': u'KualaLumpurMin', u'TokyoMax': u'TokyoMax', u'KualaLumpurMax': u'KualaLumpurMax', u'HongKongMax': u'HongKongMax', u'BangkokMin': u'BangkokMin', u'BangkokMax': u'BangkokMax', u'TokyoMin': u'TokyoMin'})]\n" ] } ], "source": [ "from hyperstream import StreamInstance\n", "from hyperstream import StreamId\n", "\n", "one_country_stream = temp_streams[country]\n", "\n", "# It is similar to a database channel\n", "A = hs.channel_manager.assets\n", "this_cities_stream = A.get_or_create_stream('cities_{}'.format(country))\n", "\n", "mapping = {}\n", "for city in this_cities_list:\n", " mapping[city] = city\n", "\n", "A.write_to_stream(stream_id=this_cities_stream.stream_id, data=StreamInstance(ti_all.end, mapping))\n", "\n", "print this_cities_stream.window(TimeInterval.up_to_now()).items()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [], "source": [ "for city in this_cities_list:\n", " if not hs.plate_manager.meta_data_manager.contains(identifier='city_'+city):\n", " print(\"Adding \" + city)\n", " hs.plate_manager.meta_data_manager.insert(parent='root', data=city,\n", " tag='city', identifier='city_'+city)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Adding NewDelhiMax\n", "Adding NewDelhiMin\n", "Adding HongKongMin\n", "Adding KualaLumpurMin\n", "Adding TokyoMax\n", "Adding KualaLumpurMax\n", "Adding HongKongMax\n", "Adding BangkokMin\n", "Adding BangkokMax\n", "Adding TokyoMin\n" ] } ], "source": [ "cities_plate = hs.plate_manager.create_plate(plate_id='C', meta_data_id='city', parent_plate=None, \n", " values=[], complement=True, description='Cities')\n", "this_country_temps = []\n", "for city in this_cities_list:\n", " print(\"Adding \" + city)\n", " this_country_temps.append(M.get_or_create_stream(stream_id=StreamId(name='temperature',\n", " meta_data=(('city', city),))))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is possible to create all the Streams passing a list to the splitter tool **splitter_from_list**. However, this could not be automated in a workflow\n", "\n", "```Python\n", "# TODO Ussing this new tool, it is not necessary to create a new stream. However, if it is a Stream it could be\n", "# automated for any other countries\n", "splitter_tool = hs.plugins.example.tools.splitter_from_list(element=None)\n", "\n", "# TODO try to change the parameter name of MultiOutputTool splitting_stream to splitting_parameter\n", "# or something that does not force you to think that it is a stream\n", "splitter_tool.execute(source=one_country_stream, splitting_stream=this_cities_list, output_plate=cities_plate, \n", " interval=ti_all, input_plate_value=None, sinks=this_country_temps)\n", "```\n", "\n", "**TODO: Ask Tom: Question:** With the splitter_from_stream version we still need to create a list with the mapping... Then, I can not see the difference between using one or the other method.\n", "**Answer:** The list in the Stream is allowed to change over time, this makes the future Streams to be more robust to change (e.g. the number of houses in the SPHERE project). Also, there is a tool that uses a dictionary for the splitting criteria, that is **splitter_of_dict** that expects the splitter_stream to be None, and a mapping parameter containing the static mapping." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [], "source": [ "splitter_tool = hs.tools.splitter_from_stream(element=None, use_mapping_keys_only=False)\n", "\n", "splitter_tool.execute(source=one_country_stream, splitting_stream=this_cities_stream, output_plate=cities_plate, \n", " interval=ti_all, input_plate_value=None, sinks=this_country_temps)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Temperature in NewDelhiMax\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "one_city = this_country_temps[0]\n", "city_name = one_city.stream_id.meta_data[0][1]\n", "my_time, my_data = zip(*[(key.__str__(), value) for key, value in one_city.window(ti_all).items()])\n", "\n", "plot_high_chart(my_time, my_data, type=\"high_stock\", title='Temperature in {}'.format(city_name), yax='ºC')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Put this all together as a workflow" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": true }, "outputs": [], "source": [ "w = hs.create_workflow(\n", " workflow_id=\"world_climate\",\n", " name=\"World climate data and statistics\", \n", " description=\"Climate data statistics of ceveral cities from Asia, Australia, New Zeland and USA\",\n", " owner=\"Miquel\",\n", " online=False,\n", " monitor=False\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## First create the nodes\n", "\n", "Nodes correspond to the Streams that we used above." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [ { "ename": "NameError", "evalue": "name 'S' is not defined", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m nodes = (\n\u001b[0;32m----> 5\u001b[0;31m \u001b[0mNodeDef\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mS\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"wearable\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m\"H\"\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 6\u001b[0m \u001b[0mNodeDef\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mM\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"wearable_xl1\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m\"H\"\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0mNodeDef\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mM\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"window_5\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m\"H\"\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mNameError\u001b[0m: name 'S' is not defined" ] } ], "source": [ "from collections import namedtuple\n", "NodeDef = namedtuple('NodeDef', ['channel', 'stream_name', 'plate_ids'], verbose=False)\n", "\n", "nodes = (\n", " NodeDef(S, \"wearable\", [\"H\"]),\n", " NodeDef(M, \"wearable_xl1\", [\"H\"]),\n", " NodeDef(M, \"window_5\", [\"H\"]),\n", " NodeDef(M, \"window_300\", [\"H\"]),\n", " NodeDef(M, \"arm_angle\", [\"H\"]),\n", " NodeDef(M, \"inactivity\", [\"H\"])\n", ")\n", "\n", "# Simple object to hold nodes\n", "class NodeCollection(object): \n", " pass\n", "\n", "N = NodeCollection()\n", "\n", "for n in nodes:\n", " setattr(N, n.stream_name, w.create_node(channel=n.channel, stream_name=n.stream_name, plate_ids=n.plate_ids))" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 1 }