{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "# HyperStream Tutorial 5: Workflows\n", "\n", "Workflows define a graph of streams. Usually, the first stream will be a special \"raw\" stream that pulls in data from a custom data source. Workflows can have multiple time ranges, which will cause the streams to be computed on all of the ranges given.\n", "\n", "## Introduction\n", "\n", "In this tutorial, we will be ussing a time-series dataset about the temperature in different countries and cities. The dataset is available at [The Census at School New Zeland][1]. The necessary files for this tutorial are already included in the folder **data/TimeSeriesDatasets_130207**.\n", "\n", "In particular, there are four files with the minimum and maximum temperatures in different cities of Asia, Australia, NZ and USA from 2000 to 2012. And the rainfall levels of New Zeland. \n", "\n", "![workflows](images/workflow_world_temp.svg)\n", "\n", "[1]: http://new.censusatschool.org.nz/resource/time-series-data-sets-2013/" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPython 2.7.6\n", "IPython 5.4.1\n", "\n", "hyperstream 0.3.7\n", "\n", "compiler : GCC 4.8.4\n", "system : Linux\n", "release : 3.19.0-80-generic\n", "machine : x86_64\n", "processor : x86_64\n", "CPU cores : 4\n", "interpreter: 64bit\n", "Git hash : fb58a388c5f5e844032987ac5e180263e9637519\n", "HyperStream version 0.3.7, connected to mongodb://localhost:27017/hyperstream, session id \n", "[u'example', u'data_importers', u'data_generators']\n" ] } ], "source": [ "try:\n", " %load_ext watermark\n", " watermark = True\n", "except ImportError:\n", " watermark = False\n", " pass\n", "\n", "import sys\n", "sys.path.append(\"../\") # Add parent dir in the Path\n", "\n", "from hyperstream import HyperStream\n", "from hyperstream import TimeInterval\n", "from hyperstream.utils import UTC\n", "from hyperstream import Workflow\n", "import hyperstream\n", "\n", "from datetime import datetime\n", "from utils import plot_high_chart\n", "from utils import plot_multiple_stock\n", "from dateutil.parser import parse\n", "\n", "if watermark:\n", " %watermark -v -m -p hyperstream -g\n", "\n", "hs = HyperStream(loglevel=30)\n", "M = hs.channel_manager.memory\n", "print(hs)\n", "print([p.channel_id_prefix for p in hs.config.plugins])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reading the data\n", "\n", "In the data folder there are four csv files with the names **TempAsia.csv, TempAustralia.csv, TempNZ.csv and TempUSA.csv**. The first column of each csv file contains a header with the names of the columns. The first one being the date and the following are the minimum and maximum temperature in different cities with the format **cityMin** and **cityMax**.\n", "\n", "Here is an example of the first 5 rows of the **TempAsia.csv** file:\n", "\n", "```\n", "Date,TokyoMax,TokyoMin,BangkokMax,BangkokMin\n", "2000M01,11.2,4.2,32.8,24\n", "```\n", "\n", "The format of the date has the form **YYYYMmm** where **YYYY** is the year and **mm** is the month. Because this format is not recognized by the default parser of the **csv_reader** tool, we will need to specify our own parser that first replaces the **M** by an hyphen **-** and then applies the **dateutils.parser**.\n", "\n", "Then, we will use a tool to read each csv, and a Stream to store all the results of applying the tool. When we specify to the tool that there is a header row in the csv file, the value of each Stream instance will be a dictionary with the name of the column and its corresponding value. For example, a Stream instance with the 4 cities shown above will look like:\n", "\n", "```\n", "[2000-01-19 00:00:00+00:00]: {'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2, 'TokyoMax': 11.2}\n", "```" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def dateparser(dt):\n", " return parse(dt.replace('M', '-')).replace(tzinfo=UTC)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the **csv_reader** has created the instances in the country plate, we will modify the dictionaries applying a function **split_temperatures** to each instance and storing the results in a new stream **temp_data**.\n", "\n", "The function will create a dictionary with the city names and their minimum and maximum temperature. The following example shows the previous stream after applyting this function\n", "\n", "```\n", "[2000-01-19 00:00:00+00:00]: {'Bangkok': {'min': 24.0, 'max': 32.8}, 'Tokyo': {'min': 4.2, 'max': 11.2}}\n", "```" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def split_temperatures(d):\n", " \"\"\"\n", " Parameters\n", " ----------\n", " d: dictionary of the following form:\n", " {'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2, 'TokyoMax': 11.2}\n", " Returns\n", " -------\n", " dictionary of the following form\n", " {'Bangkok': {'min': 24.0, 'max': 32.8}, 'Tokyo': {'min': 4.2, 'max': 11.2}}\n", " \"\"\"\n", " new_d = {}\n", " for name, value in d.iteritems():\n", " key = name[-3:].lower()\n", " name = name[:-3]\n", " if name not in new_d:\n", " new_d[name] = {}\n", " new_d[name][key] = value\n", " return new_d" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then, we will use a **splitter_from_stream** tool that will be applied to every **country** and store the values of the **temp_dat** stream into the corresponding **city** nodes. The new city nodes will contain a dictionary with minimum and maximum values, in the form:\n", "\n", "```\n", "[2000-01-19 00:00:00+00:00]: {'min': 24.0, 'max': 32.8}\n", "```\n", "\n", "Then, we will apply the function **dict_mean** that will compute the mean of all the values in the dictionary and that we will store in the streams **city_avg_temp**.\n", "\n", "```\n", "[2000-01-19 00:00:00+00:00]: 28.4\n", "```" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def dict_mean(d):\n", " x = d.values()\n", " x = [value for value in x if value is not None]\n", " return float(sum(x)) / max(len(x), 1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create the plates and meta_data instances" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "root[root:None]\n", "╟── country[country_NZ:NZ]\n", "║ ╟── city[country_NZ.city_Auckland:Auckland]\n", "║ ╟── city[country_NZ.city_Christchurch:Christchurch]\n", "║ ╟── city[country_NZ.city_Dunedin:Dunedin]\n", "║ ╟── city[country_NZ.city_Hamilton:Hamilton]\n", "║ ╙── city[country_NZ.city_Wellington:Wellington]\n", "╟── country[country_Australia:Australia]\n", "║ ╟── city[country_Australia.city_Brisbane:Brisbane]\n", "║ ╟── city[country_Australia.city_Canberra:Canberra]\n", "║ ╟── city[country_Australia.city_GoldCoast:GoldCoast]\n", "║ ╟── city[country_Australia.city_Melbourne:Melbourne]\n", "║ ╙── city[country_Australia.city_Sydney:Sydney]\n", "╟── country[country_USA:USA]\n", "║ ╟── city[country_USA.city_Chicago:Chicago]\n", "║ ╟── city[country_USA.city_Houston:Houston]\n", "║ ╟── city[country_USA.city_LosAngeles:LosAngeles]\n", "║ ╟── city[country_USA.city_NY:NY]\n", "║ ╙── city[country_USA.city_Seattle:Seattle]\n", "╙── country[country_Asia:Asia]\n", " ╟── city[country_Asia.city_Bangkok:Bangkok]\n", " ╟── city[country_Asia.city_HongKong:HongKong]\n", " ╟── city[country_Asia.city_KualaLumpur:KualaLumpur]\n", " ╟── city[country_Asia.city_NewDelhi:NewDelhi]\n", " ╙── city[country_Asia.city_Tokyo:Tokyo]\n", "\n" ] } ], "source": [ "countries_dict = {\n", " 'Asia': ['Bangkok', 'HongKong', 'KualaLumpur', 'NewDelhi', 'Tokyo'],\n", " 'Australia': ['Brisbane', 'Canberra', 'GoldCoast', 'Melbourne', 'Sydney'],\n", " 'NZ': ['Auckland', 'Christchurch', 'Dunedin', 'Hamilton','Wellington'],\n", " 'USA': ['Chicago', 'Houston', 'LosAngeles', 'NY', 'Seattle']\n", "}\n", "\n", "# delete_plate requires the deletion to be first childs and then parents\n", "for plate_id in ['C.C', 'C']:\n", " if plate_id in [plate[0] for plate in hs.plate_manager.plates.items()]:\n", " hs.plate_manager.delete_plate(plate_id=plate_id, delete_meta_data=True)\n", "\n", "for country in countries_dict:\n", " id_country = 'country_' + country\n", " if not hs.plate_manager.meta_data_manager.contains(identifier=id_country):\n", " hs.plate_manager.meta_data_manager.insert(\n", " parent='root', data=country, tag='country', identifier=id_country)\n", " for city in countries_dict[country]:\n", " id_city = id_country + '.' + 'city_' + city\n", " if not hs.plate_manager.meta_data_manager.contains(identifier=id_city):\n", " hs.plate_manager.meta_data_manager.insert(\n", " parent=id_country, data=city, tag='city', identifier=id_city)\n", " \n", "C = hs.plate_manager.create_plate(plate_id=\"C\", description=\"Countries\", values=[], complement=True,\n", " parent_plate=None, meta_data_id=\"country\")\n", "CC = hs.plate_manager.create_plate(plate_id=\"C.C\", description=\"Cities\", values=[], complement=True,\n", " parent_plate=\"C\", meta_data_id=\"city\")\n", "\n", "print hs.plate_manager.meta_data_manager.global_plate_definitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create the workflow and execute it" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": true }, "outputs": [], "source": [ "ti_all = TimeInterval(datetime(1999, 1, 1).replace(tzinfo=UTC),\n", " datetime(2013, 1, 1).replace(tzinfo=UTC))" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# parameters for the csv_mutli_reader tool\n", "csv_temp_params = dict(\n", " filename_template='data/TimeSeriesDatasets_130207/Temp{}.csv',\n", " datetime_parser=dateparser, skip_rows=0, header=True)\n", "\n", "csv_rain_params = dict(\n", " filename_template='data/TimeSeriesDatasets_130207/{}Rainfall.csv',\n", " datetime_parser=dateparser, skip_rows=0, header=True)\n", "\n", "def mean(x):\n", " \"\"\"\n", " Computes the mean of the values in x, discarding the None values\n", " \"\"\"\n", " x = [value for value in x if value is not None]\n", " return float(sum(x)) / max(len(x), 1)\n", "\n", "with Workflow(workflow_id='tutorial_05',\n", " name='tutorial_05',\n", " owner='tutorials',\n", " description='Tutorial 5 workflow',\n", " online=False) as w:\n", "\n", " country_node_raw_temp = w.create_node(stream_name='raw_temp_data', channel=M, plates=[C])\n", " country_node_temp = w.create_node(stream_name='temp_data', channel=M, plates=[C])\n", " city_node_temp = w.create_node(stream_name='city_temp', channel=M, plates=[CC])\n", " city_node_avg_temp = w.create_node(stream_name='city_avg_temp', channel=M, plates=[CC])\n", " country_node_avg_temp = w.create_node(stream_name='country_avg_temp', channel=M, plates=[C])\n", "\n", " country_node_raw_rain = w.create_node(stream_name='raw_rain_data', channel=M, plates=[C])\n", " city_node_rain = w.create_node(stream_name='city_rain', channel=M, plates=[CC])\n", " country_node_avg_rain = w.create_node(stream_name='country_avg_rain', channel=M, plates=[C])\n", " \n", " city_node_temp_rain = w.create_node(stream_name='city_temp_rain', channel=M, plates=[CC])\n", " country_node_avg_temp_rain = w.create_node(stream_name='country_avg_temp_rain', channel=M, plates=[C])\n", " \n", " world_node_avg_temp = w.create_node(stream_name='world_avg_temp', channel=M, plates=[])\n", "\n", " for c in C:\n", " country_node_raw_temp[c] = hs.plugins.data_importers.factors.csv_multi_reader(\n", " source=None, **csv_temp_params)\n", " country_node_temp[c] = hs.factors.apply(\n", " sources=[country_node_raw_temp[c]],\n", " func=split_temperatures)\n", "\n", " country_node_raw_rain[c] = hs.plugins.data_importers.factors.csv_multi_reader(\n", " source=None, **csv_rain_params)\n", " for cc in CC[c]:\n", " city_node_temp[cc] = hs.factors.splitter_from_stream(\n", " source=country_node_temp[c],\n", " splitting_node=country_node_temp[c],\n", " use_mapping_keys_only=True)\n", " city_node_avg_temp[cc] = hs.factors.apply(\n", " sources=[city_node_temp[c]],\n", " func=dict_mean)\n", "\n", " city_node_rain[cc] = hs.factors.splitter_from_stream(\n", " source=country_node_raw_rain[c],\n", " splitting_node=country_node_raw_rain[c],\n", " use_mapping_keys_only=True)\n", " \n", " city_node_temp_rain[cc] = hs.plugins.example.factors.aligned_correlation(\n", " sources=[city_node_avg_temp[cc],\n", " city_node_rain[cc]],\n", " use_mapping_keys_only=True)\n", "\n", " country_node_avg_temp[c] = hs.factors.aggregate(\n", " sources=[city_node_avg_temp],\n", " alignment_node=None,\n", " aggregation_meta_data='city', func=mean)\n", " country_node_avg_rain[c] = hs.factors.aggregate(\n", " sources=[city_node_rain],\n", " alignment_node=None,\n", " aggregation_meta_data='city', func=mean)\n", " country_node_avg_temp_rain[c] = hs.factors.aggregate(\n", " sources=[city_node_temp_rain],\n", " alignment_node=None,\n", " aggregation_meta_data='city', func=mean)\n", " \n", " world_node_avg_temp[None] = hs.factors.aggregate(sources=[country_node_avg_temp],\n", " alignment_node=None,\n", " aggregation_meta_data='country',\n", " func=mean)\n", "\n", " w.execute(ti_all)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## See the temperature and rain in all the cities\n", "\n", "Lets see the a small sample of the temperatures in each city. We can use the function **find_streams** to retrieve all the streams that have as a meta_data the key and values that we specify. In the following example we find all the streams with the name **temp_data** and we print a small sample" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "temp_data: [country=USA]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value={'Houston': {'max': 16.5, 'min': 5.7}, 'LosAngeles': {'max': 18.7, 'min': 7.0}, 'NY': {'max': 8.0, 'min': -0.9}, 'Seattle': {'max': 7.4, 'min': 0.2}, 'Chicago': {'max': 2.1, 'min': -6.1}})]\n", "temp_data: [country=Asia]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value={'KualaLumpur': {'max': 31.8, 'min': 23.7}, 'HongKong': {'max': 19.3, 'min': 13.3}, 'Bangkok': {'max': 33.4, 'min': 23.4}, 'NewDelhi': {'max': 21.7, 'min': 7.0}, 'Tokyo': {'max': 10.9, 'min': 4.6}})]\n", "temp_data: [country=Australia]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value={'Brisbane': {'max': 29.0, 'min': 20.8}, 'Melbourne': {'max': 28.0, 'min': 16.8}, 'Sydney': {'max': 28.1, 'min': 19.1}, 'GoldCoast': {'max': 30.8, 'min': 21.0}, 'Canberra': {'max': 31.5, 'min': 13.8}})]\n", "temp_data: [country=NZ]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value={'Dunedin': {'max': 19.8, 'min': 9.0}, 'Hamilton': {'max': 23.5, 'min': 13.4}, 'Wellington': {'max': 20.5, 'min': 13.9}, 'Christchurch': {'max': 20.6, 'min': 10.5}, 'Auckland': {'max': 23.0, 'min': 16.0}})]\n" ] } ], "source": [ "ti_sample = TimeInterval(datetime(2007, 1, 1).replace(tzinfo=UTC),\n", " datetime(2007, 2, 1).replace(tzinfo=UTC))\n", "\n", "for stream_id, stream in M.find_streams(name='temp_data').iteritems():\n", " print(stream_id)\n", " print(stream.window(ti_sample).items())" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[city_avg_temp: [country=NZ, city=Christchurch]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=15.55)]\n", "[city_avg_temp: [country=USA, city=Seattle]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=3.8000000000000003)]\n", "[city_avg_temp: [country=Australia, city=GoldCoast]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=25.9)]\n", "[city_avg_temp: [country=Asia, city=Tokyo]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=7.75)]\n", "[city_avg_temp: [country=Asia, city=Bangkok]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=28.4)]\n", "[city_avg_temp: [country=Asia, city=KualaLumpur]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=27.75)]\n", "[city_avg_temp: [country=Asia, city=HongKong]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=16.3)]\n", "[city_avg_temp: [country=Australia, city=Melbourne]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=22.4)]\n", "[city_avg_temp: [country=NZ, city=Auckland]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=19.5)]\n", "[city_avg_temp: [country=Asia, city=NewDelhi]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=14.35)]\n", "[city_avg_temp: [country=NZ, city=Wellington]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=17.2)]\n", "[city_avg_temp: [country=Australia, city=Brisbane]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=24.9)]\n", "[city_avg_temp: [country=USA, city=Chicago]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=-1.9999999999999998)]\n", "[city_avg_temp: [country=NZ, city=Dunedin]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=14.4)]\n", "[city_avg_temp: [country=USA, city=NY]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=3.55)]\n", "[city_avg_temp: [country=USA, city=LosAngeles]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=12.85)]\n", "[city_avg_temp: [country=NZ, city=Hamilton]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=18.45)]\n", "[city_avg_temp: [country=Australia, city=Sydney]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=23.6)]\n", "[city_avg_temp: [country=Australia, city=Canberra]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=22.65)]\n", "[city_avg_temp: [country=USA, city=Houston]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=11.1)]\n" ] } ], "source": [ "for stream_id, stream in M.find_streams(name='city_avg_temp').iteritems():\n", " print('[{}]'.format(stream_id))\n", " print(stream.window(ti_sample).items())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see the ratio between the temperature and the rain for every month. In this case, we do not have the rain for most of the cities. For that reason, some of the nodes are empty." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[city_temp_rain: [country=NZ, city=Wellington]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=0.22872340425531912)]\n", "[city_temp_rain: [country=Asia, city=HongKong]]\n", "[]\n", "[city_temp_rain: [country=USA, city=Chicago]]\n", "[]\n", "[city_temp_rain: [country=Asia, city=KualaLumpur]]\n", "[]\n", "[city_temp_rain: [country=NZ, city=Christchurch]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=0.6941964285714286)]\n", "[city_temp_rain: [country=NZ, city=Dunedin]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=0.3257918552036199)]\n", "[city_temp_rain: [country=Australia, city=Brisbane]]\n", "[]\n", "[city_temp_rain: [country=USA, city=LosAngeles]]\n", "[]\n", "[city_temp_rain: [country=USA, city=Houston]]\n", "[]\n", "[city_temp_rain: [country=Australia, city=Canberra]]\n", "[]\n", "[city_temp_rain: [country=NZ, city=Hamilton]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=0.16312997347480107)]\n", "[city_temp_rain: [country=Asia, city=Tokyo]]\n", "[]\n", "[city_temp_rain: [country=USA, city=Seattle]]\n", "[]\n", "[city_temp_rain: [country=NZ, city=Auckland]]\n", "[StreamInstance(timestamp=datetime.datetime(2007, 1, 21, 0, 0, tzinfo=), value=0.32608695652173914)]\n", "[city_temp_rain: [country=Asia, city=Bangkok]]\n", "[]\n", "[city_temp_rain: [country=Australia, city=Sydney]]\n", "[]\n", "[city_temp_rain: [country=Asia, city=NewDelhi]]\n", "[]\n", "[city_temp_rain: [country=Australia, city=Melbourne]]\n", "[]\n", "[city_temp_rain: [country=USA, city=NY]]\n", "[]\n", "[city_temp_rain: [country=Australia, city=GoldCoast]]\n", "[]\n" ] } ], "source": [ "for stream_id, stream in M.find_streams(name='city_temp_rain').iteritems():\n", " print('[{}]'.format(stream_id))\n", " print(stream.window(ti_sample).items())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualisations\n", "\n", "Here we create a function that will extract the names, timestamps and values of all the streams and will return them in the correct format to call the function **plot_multiple_stock** that is used through all the tutorial.\n", "\n", "Then, we can find the streams that we want to visualize and plot their values. In the following example, we can see the average temperature of some cities of Australia." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Temperatures in Australia\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "def get_x_y_names_from_streams(streams, tag=None):\n", " names = []\n", " y = []\n", " x = []\n", " for stream_id, stream in streams.iteritems():\n", " if len(stream.window().items()) == 0:\n", " continue\n", " if tag is not None:\n", " meta_data = dict(stream_id.meta_data)\n", " name = meta_data[tag]\n", " else:\n", " name = ''\n", " names.append(name)\n", " y.append([instance.value for instance in stream.window().items()])\n", " x.append([str(instance.timestamp) for instance in stream.window().items()])\n", " return y, x, names\n", "\n", "data, time, names = get_x_y_names_from_streams(M.find_streams(country='Australia', name='city_avg_temp'), 'city')\n", "\n", "plot_multiple_stock(data, time=time, names=names,\n", " title='Temperatures in Australia', ylabel='ºC')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we visualize the average temperatures in some cities of New Zealand." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Temperatures in New Zealand\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "data, time, names = get_x_y_names_from_streams(M.find_streams(country='NZ', name='city_avg_temp'), 'city')\n", "\n", "plot_multiple_stock(data, time=time, names=names,\n", " title='Temperatures in New Zealand', ylabel='ºC')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The rain-fall in New Zealand." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Rain in New Zealand\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "data, time, names = get_x_y_names_from_streams(M.find_streams(country='NZ', name='city_rain'), 'city')\n", "\n", "plot_multiple_stock(data, time=time, names=names,\n", " title='Rain in New Zealand', ylabel='some precipitation unit')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And the correlation between temperature and rain of all the cities. In this case, we only have this ratio for the some of the cities of New Zealand." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Temperatures in New Zealand\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "data, time, names = get_x_y_names_from_streams(M.find_streams(name='city_temp_rain'), 'city')\n", "\n", "plot_multiple_stock(data, time=time, names=names,\n", " title='Temperatures in New Zealand', ylabel='Cº/rain units')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see the streams at a country level with the averages of each of its cities." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Temperatures in countries\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "data, time, names = get_x_y_names_from_streams(M.find_streams(name='country_avg_temp'), 'country')\n", "\n", "plot_multiple_stock(data, time=time, names=names,\n", " title='Temperatures in countries', ylabel='ºC')" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Average rain in countries\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "data, time, names = get_x_y_names_from_streams(M.find_streams(name='country_avg_rain'), 'country')\n", "\n", "plot_multiple_stock(data, time=time, names=names,\n", " title='Average rain in countries', ylabel='some precipitation unit')" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Average temperature in all countries\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "data, time, names = get_x_y_names_from_streams(M.find_streams(name='world_avg_temp'))\n", "\n", "plot_multiple_stock(data, time=time, names=names,\n", " title='Average temperature in all countries', ylabel='Cº')" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'factors': [{'id': 'csv_multi_reader',\n", " 'sink': 'raw_temp_data',\n", " 'sources': []},\n", " {'id': 'apply',\n", " 'sink': 'temp_data',\n", " 'sources': ['raw_temp_data']},\n", " {'id': 'csv_multi_reader',\n", " 'sink': 'raw_rain_data',\n", " 'sources': []},\n", " {'id': 'splitter_from_stream',\n", " 'sink': 'city_temp',\n", " 'sources': ['temp_data']},\n", " {'id': 'apply',\n", " 'sink': 'city_avg_temp',\n", " 'sources': ['city_temp']},\n", " {'id': 'splitter_from_stream',\n", " 'sink': 'city_rain',\n", " 'sources': ['raw_rain_data']},\n", " {'id': 'aligned_correlation',\n", " 'sink': 'city_temp_rain',\n", " 'sources': ['city_avg_temp', 'city_rain']},\n", " {'id': 'aggregate',\n", " 'sink': 'country_avg_temp',\n", " 'sources': ['city_avg_temp']},\n", " {'id': 'aggregate',\n", " 'sink': 'country_avg_rain',\n", " 'sources': ['city_rain']},\n", " {'id': 'aggregate',\n", " 'sink': 'country_avg_temp_rain',\n", " 'sources': ['city_temp_rain']},\n", " {'id': 'aggregate',\n", " 'sink': 'world_avg_temp',\n", " 'sources': ['country_avg_temp']}],\n", " 'nodes': [{'id': 'city_avg_temp'},\n", " {'id': 'country_avg_temp'},\n", " {'id': 'country_avg_rain'},\n", " {'id': 'city_rain'},\n", " {'id': 'raw_rain_data'},\n", " {'id': 'country_avg_temp_rain'},\n", " {'id': 'city_temp_rain'},\n", " {'id': 'temp_data'},\n", " {'id': 'city_temp'},\n", " {'id': 'world_avg_temp'},\n", " {'id': 'raw_temp_data'}],\n", " 'plates': {u'C': [{'id': 'country_avg_temp', 'type': 'node'},\n", " {'id': 'country_avg_rain', 'type': 'node'},\n", " {'id': 'raw_rain_data', 'type': 'node'},\n", " {'id': 'country_avg_temp_rain', 'type': 'node'},\n", " {'id': 'temp_data', 'type': 'node'},\n", " {'id': 'raw_temp_data', 'type': 'node'},\n", " {'id': 'apply', 'type': 'factor'},\n", " {'id': 'aggregate', 'type': 'factor'},\n", " {'id': 'aggregate', 'type': 'factor'},\n", " {'id': 'aggregate', 'type': 'factor'}],\n", " u'C.C': [{'id': 'city_avg_temp', 'type': 'node'},\n", " {'id': 'city_rain', 'type': 'node'},\n", " {'id': 'city_temp_rain', 'type': 'node'},\n", " {'id': 'city_temp', 'type': 'node'},\n", " {'id': 'apply', 'type': 'factor'},\n", " {'id': 'aligned_correlation', 'type': 'factor'}],\n", " 'root': [{'id': 'aggregate', 'type': 'factor'}]}}\n" ] } ], "source": [ "from pprint import pprint\n", "pprint(w.to_dict(tool_long_names=False))" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\n", " \"nodes\": [\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"city_avg_temp\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"country_avg_temp\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"country_avg_rain\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"city_rain\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"raw_rain_data\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"country_avg_temp_rain\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"city_temp_rain\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"temp_data\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"city_temp\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"world_avg_temp\"\n", " },\n", " {\n", " \"type\": \"rv\",\n", " \"id\": \"raw_temp_data\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"csv_multi_reader\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"apply\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"csv_multi_reader\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"splitter_from_stream\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"apply\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"splitter_from_stream\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"aligned_correlation\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"aggregate\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"aggregate\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"aggregate\"\n", " },\n", " {\n", " \"type\": \"fac\",\n", " \"id\": \"aggregate\"\n", " }\n", " ],\n", " \"links\": [\n", " {\n", " \"source\": \"csv_multi_reader\",\n", " \"target\": \"raw_temp_data\"\n", " },\n", " {\n", " \"source\": \"raw_temp_data\",\n", " \"target\": \"apply\"\n", " },\n", " {\n", " \"source\": \"apply\",\n", " \"target\": \"temp_data\"\n", " },\n", " {\n", " \"source\": \"csv_multi_reader\",\n", " \"target\": \"raw_rain_data\"\n", " },\n", " {\n", " \"source\": \"temp_data\",\n", " \"target\": \"splitter_from_stream\"\n", " },\n", " {\n", " \"source\": \"splitter_from_stream\",\n", " \"target\": \"city_temp\"\n", " },\n", " {\n", " \"source\": \"city_temp\",\n", " \"target\": \"apply\"\n", " },\n", " {\n", " \"source\": \"apply\",\n", " \"target\": \"city_avg_temp\"\n", " },\n", " {\n", " \"source\": \"raw_rain_data\",\n", " \"target\": \"splitter_from_stream\"\n", " },\n", " {\n", " \"source\": \"splitter_from_stream\",\n", " \"target\": \"city_rain\"\n", " },\n", " {\n", " \"source\": \"city_avg_temp\",\n", " \"target\": \"aligned_correlation\"\n", " },\n", " {\n", " \"source\": \"city_rain\",\n", " \"target\": \"aligned_correlation\"\n", " },\n", " {\n", " \"source\": \"aligned_correlation\",\n", " \"target\": \"city_temp_rain\"\n", " },\n", " {\n", " \"source\": \"city_avg_temp\",\n", " \"target\": \"aggregate\"\n", " },\n", " {\n", " \"source\": \"aggregate\",\n", " \"target\": \"country_avg_temp\"\n", " },\n", " {\n", " \"source\": \"city_rain\",\n", " \"target\": \"aggregate\"\n", " },\n", " {\n", " \"source\": \"aggregate\",\n", " \"target\": \"country_avg_rain\"\n", " },\n", " {\n", " \"source\": \"city_temp_rain\",\n", " \"target\": \"aggregate\"\n", " },\n", " {\n", " \"source\": \"aggregate\",\n", " \"target\": \"country_avg_temp_rain\"\n", " },\n", " {\n", " \"source\": \"country_avg_temp\",\n", " \"target\": \"aggregate\"\n", " },\n", " {\n", " \"source\": \"aggregate\",\n", " \"target\": \"world_avg_temp\"\n", " }\n", " ]\n", "}\n" ] } ], "source": [ "print(w.to_json(w.factorgraph_viz, tool_long_names=False, indent=4))" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 1 }