{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "# HyperStream Tutorial 2: Your own tools\n", "\n", "HyperStream has a set of predefined tools in hyperstream.tools. However, it is possible to define your own tools and factors. In this tutorial, we show how to create a simple plugin that reads a CSV file. In this tutorial, we already created the tool and made all the configurations necessary for it to work. We will describe how this one was created, and how can you create a new one.\n", "\n", "## Creating a plugin tool to read csv files\n", "\n", "### 1. Create a folder in plugins\n", "\n", "First of all, we need to create a new folder to contain our new tool. The new folder needs to be in the folder __plugins__, in this example __plugins/example/tools/csv_reader/__. We need to create an **\\__init\\__.py** file in every subfolder to be able to treat all the folders as a Python package.\n", "\n", " plugins/\n", " |- __init__.py\n", " |- one_plugin/\n", " | |- __init__.py\n", " | |- tools/\n", " | |- __init__.py\n", " | |- tool_a\n", " | |- __init__.py\n", " | |- 2017-06-20_v0.0.1.py\n", " | |- 2017-06-22_v0.0.3.py\n", " |- another_plugin/\n", " |- __init__.py\n", " |- tools/\n", " |- tool_b/\n", " | |- __init__.py\n", " | |- 2017-06-20_v0.0.1.py\n", " | |- 2017-06-22_v0.1.0.py\n", " |- tool_c/\n", " |- __init__.py\n", " |- 2017-06-20_v0.0.2.py\n", "\n", "### 2. Write the plugin in Python\n", "\n", "Then, we need to create a new Python file following the name convention <year>-<month\\>-<day\\>_v<version\\>.<subversion\\>.<subsubversion\\>.py. In this example you can find the file with the following content in **./plugins/example/tools/csv_reader/2017-06-20_v0.0.1.py**\n", "\n", "```Python\n", "from hyperstream import Tool, StreamInstance\n", "from hyperstream.utils import check_input_stream_count\n", "\n", "from dateutil.parser import parse\n", "\n", "\n", "class CsvReader(Tool):\n", " def __init__(self, filename):\n", " super(CsvReader, self).__init__(filename=filename)\n", "\n", " @check_input_stream_count(0)\n", " def _execute(self, sources, alignment_stream, interval):\n", "\n", " # Let's make the assumption that the first field is the timestamp\n", "\n", " first = True\n", "\n", " with open(self.filename, 'rU') as f:\n", " for line in f.readlines():\n", " if first:\n", " first = False\n", " continue\n", " elements = line.split(',')\n", " dt = parse(elements[0])\n", " if dt in interval:\n", " yield StreamInstance(dt, map(float, elements[1:]))\n", "\n", "```\n", "\n", "### 3. Add HyperStream configuration\n", "\n", "Now, it is necessary to add information about this plugin into the **hyperstream_config.json**. In particular, we need to add the following information in the plugin section:\n", "\n", "- channel_id_prefix: This is to create Channels (explained in another tutorial).\n", "- channel_names: A list of available Channels\n", "- path: path to the new plugin\n", "- has_tools: If the new plugin has tools\n", "- has_assets: If it contains folders or files that are needed by the plugin\n", "\n", "Next, we have an example of an configuration file with the new plugin:\n", "\n", "```json\n", "{\n", " \"mongo\": {\n", " \"host\": \"localhost\",\n", " \"port\": 27017,\n", " \"tz_aware\": true,\n", " \"db\": \"hyperstream\"\n", " },\n", " \"plugins\": [{\n", " \"channel_id_prefix\": \"example\",\n", " \"channel_names\": [],\n", " \"path\": \"plugins/example\",\n", " \"has_tools\": true,\n", " \"has_assets\": false\n", " }],\n", " \"online_engine\": {\n", " \"interval\": {\n", " \"start\": -60,\n", " \"end\": -10\n", " },\n", " \"sleep\": 5,\n", " \"iterations\": 100\n", " }\n", "}\n", "\n", "```\n", "\n", "## Using the new tool\n", "\n", "Now we can write some HyperStream code that uses the new plugin." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPython 2.7.6\n", "IPython 5.3.0\n", "\n", "hyperstream 0.3.0-beta\n", "\n", "compiler : GCC 4.8.4\n", "system : Linux\n", "release : 3.19.0-80-generic\n", "machine : x86_64\n", "processor : x86_64\n", "CPU cores : 4\n", "interpreter: 64bit\n", "Git hash : f0e911526041b91fe7999a8968c80618d410e741\n", "HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id \n" ] } ], "source": [ "%load_ext watermark\n", "\n", "import sys\n", "sys.path.append(\"../\") # Add parent dir in the Path\n", "\n", "from hyperstream import HyperStream\n", "from hyperstream import TimeInterval\n", "from hyperstream.utils import UTC\n", "\n", "from datetime import datetime\n", "\n", "from utils import plot_high_chart\n", "from utils import plot_multiple_stock\n", "\n", "%watermark -v -m -p hyperstream -g\n", "\n", "hs = HyperStream(loglevel=20)\n", "print hs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading the new plugin\n", "\n", "After instantiating HyperStream, if the configuration of the plugin and the plugin are in the right place, we will be able to load our new tool **csv_reader**, specifying where is the input file.\n", "\n", "The data is the Polar Ice data that can be found in [this link][1]\n", "\n", "[1]: http://new.censusatschool.org.nz/resource/time-series-data-sets-2012/" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [], "source": [ "reader_tool = hs.plugins.example.tools.csv_reader('data/sea_ice.csv')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a stream\n", "\n", "Now we can create a stream to store all the results in memory." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [], "source": [ "sea_ice_stream = hs.channel_manager.memory.get_or_create_stream(\"sea_ice\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Execute the tool\n", "\n", "We can now execute the tool in the interval of interest" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "TimeIntervals([TimeInterval(start=datetime.datetime(1990, 1, 1, 0, 0, tzinfo=), end=datetime.datetime(2011, 11, 1, 0, 0, tzinfo=))])" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2011, 11, 1).replace(tzinfo=UTC))\n", "\n", "reader_tool.execute(sources=[], sink=sea_ice_stream, interval=ti)\n", "\n", "sea_ice_stream.calculated_intervals" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query the stream\n", "\n", "And finally we can query certain period of time to the tool and store the information in the created stream" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1995-02-01 00:00:00+00:00]: [13.3, 2.12]\n", "[1995-03-01 00:00:00+00:00]: [13.28, 2.74]\n", "[1995-04-01 00:00:00+00:00]: [12.32, 5.35]\n", "[1995-05-01 00:00:00+00:00]: [10.76, 8.23]\n", "[1995-06-01 00:00:00+00:00]: [8.86, 10.37]\n", "[1995-07-01 00:00:00+00:00]: [6.05, 12.47]\n", "[1995-08-01 00:00:00+00:00]: [4.61, 14.16]\n", "[1995-09-01 00:00:00+00:00]: [4.38, 14.42]\n", "[1995-10-01 00:00:00+00:00]: [5.91, 13.47]\n", "[1995-11-01 00:00:00+00:00]: [8.95, 11.38]\n", "[1995-12-01 00:00:00+00:00]: [11.02, 7.03]\n", "[1996-01-01 00:00:00+00:00]: [12.07, 3.43]\n" ] } ], "source": [ "ti = TimeInterval(datetime(1995, 1, 1).replace(tzinfo=UTC), datetime(1996, 1, 1).replace(tzinfo=UTC))\n", "\n", "for key, value in sea_ice_stream.window(ti).items():\n", " print '[%s]: %s' % (key, value)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Visualize all the interval\n", "\n", "We can now visualize one of the values of the Stream, in this case the sea level in the Antarctica." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Sea level in the Antarctica\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "my_time, my_data = zip(*[(key.__str__(), value[1]) for key, value in sea_ice_stream.window().items()])\n", "\n", "plot_high_chart(my_time, my_data, type=\"high_stock\", title='Sea level in the Antarctica', yax='meters')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also visualize both of the Stream values, the Arctic and the Antarctica sea levels:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Sea level\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "time = [key.__str__() for key, value in sea_ice_stream.window().items()]\n", "data = [list(a) for a in zip(*[value for key, value in sea_ice_stream.window().items()])]\n", "htype= 'spline'\n", "names = ['Arctic', 'Antarctica']\n", " \n", "plot_multiple_stock(data, time=time, names=names, htype=htype, title='Sea level', ylabel='meters')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Visualize specific interval\n", "\n", "We can visualize the reduced time interval that we specified above." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " test multi-output\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "time = [key.__str__() for key, value in sea_ice_stream.window(ti).items()]\n", "data = [list(a) for a in zip(*[value for key, value in sea_ice_stream.window(ti).items()])]\n", "htype= 'spline'\n", "names = ['Arctic', 'Antarctica']\n", " \n", "plot_multiple_stock(data, time=time, names=names, htype=htype, title='test multi-output', ylabel='meters')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 1 }