{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "# HyperStream Tutorial 4: Real-time streams\n", "\n", "In this tutorial, we show how to create a new plugin that collects real-time data ussing a publicly available API. In this case, we use the [Environment Agency flood-monitoring API][1].\n", "\n", "## Creating a plugin tool to use the API\n", "\n", "### 1. Create a folder in plugins\n", "\n", "First of all, we need to create a new folder to contain the new tool. The new folder needs to be in the folder __plugins__, in this example __plugins/example/tools/environment_data_gov_uk/__. Also, we need to create an **\\__init\\__.py** file in every subfolder.\n", "\n", " plugins/\n", " |- __init__.py\n", " |- example/\n", " |- __init__.py\n", " |- tools/\n", " |- __init__.py\n", " |- environment_data_gov_uk\n", " |- __init__.py\n", " |- 2017-06-21_v0.0.1.py\n", "\n", "### 2. Write the plugin in Python\n", "\n", "As we have seen in a previous tutorial, we can create a new plugin in Python, in this case the code of the plugin **./plugins/example/tools/environment_data_gov_uk/2017-06-21_v0.0.1.py** uses the API to query only one of the water readings for the specified interval of time:\n", "\n", "```Python\n", "from datetime import datetime\n", "from datetime import datetime, timedelta\n", "\n", "from hyperstream import Tool, StreamInstance, StreamInstanceCollection\n", "from hyperstream.utils import check_input_stream_count\n", "from hyperstream.utils import UTC\n", "\n", "from dateutil.parser import parse\n", "\n", "import urllib\n", "import urllib2\n", "import json\n", "\n", "# this uses Environment Agency flood and river level data from the real-time\n", "# data API (Beta)\n", "# For questions on the APIs please contact data.info@environment-agency.gov.uk,\n", "# a forum for announcements and discussion is under consideration.\n", "class EnvironmentDataGovUk(Tool):\n", " def __init__(self, station):\n", " self.station = station\n", " super(EnvironmentDataGovUk, self).__init__()\n", "\n", " @check_input_stream_count(0)\n", " def _execute(self, sources, alignment_stream, interval):\n", " startdate = interval[0].strftime(\"%Y-%m-%d\")\n", " enddate = interval[1].strftime(\"%Y-%m-%d\")\n", "\n", " url = \"https://environment.data.gov.uk/flood-monitoring/id/stations/{}/readings\".format(self.station)\n", " values = {'startdate' : startdate,\n", " 'enddate' : enddate}\n", " url_parameters = urllib.urlencode(values)\n", "\n", " full_url = url + '?' + url_parameters\n", " response = urllib2.urlopen(full_url)\n", " data = json.load(response)\n", "\n", " for item in data['items']:\n", " dt = parse(item.get('dateTime'))\n", " if dt in interval:\n", " value = float(item.get('value'))\n", " yield StreamInstance(dt, value)\n", "\n", "```\n", "\n", "### 3. Add HyperStream configuration\n", "\n", "Now, it is necessary to add information about this plugin into the **hyperstream_config.json**. In particular, we need to add the following information in the plugin section:\n", "\n", "- channel_id_prefix: This is to create Channels (explained in another tutorial).\n", "- channel_names: A list of available Channels\n", "- path: path to the new plugin\n", "- has_tools: If the new plugin has tools\n", "- has_assets: If it contains folders or files that are needed by the plugin\n", "\n", "Next, we have an example of an configuration file with the new plugin:\n", "\n", "```json\n", "{\n", " \"mongo\": {\n", " \"host\": \"localhost\",\n", " \"port\": 27017,\n", " \"tz_aware\": true,\n", " \"db\": \"hyperstream\"\n", " },\n", " \"plugins\": [{\n", " \"channel_id_prefix\": \"example\",\n", " \"channel_names\": [],\n", " \"path\": \"plugins/example\",\n", " \"has_tools\": true,\n", " \"has_assets\": false\n", " }],\n", " \"online_engine\": {\n", " \"interval\": {\n", " \"start\": -60,\n", " \"end\": -10\n", " },\n", " \"sleep\": 5,\n", " \"iterations\": 100\n", " }\n", "}\n", "\n", "```\n", "\n", "\n", "### Aknowledge\n", "\n", " this uses Environment Agency flood and river level data from the real-time data API (Beta)\n", " \n", "[1]: https://environment.data.gov.uk/flood-monitoring/doc/reference#introduction" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPython 2.7.6\n", "IPython 5.3.0\n", "\n", "hyperstream 0.3.0-beta\n", "\n", "compiler : GCC 4.8.4\n", "system : Linux\n", "release : 3.19.0-80-generic\n", "machine : x86_64\n", "processor : x86_64\n", "CPU cores : 4\n", "interpreter: 64bit\n", "Git hash : f0e911526041b91fe7999a8968c80618d410e741\n" ] } ], "source": [ "%load_ext watermark\n", "\n", "import sys\n", "from datetime import datetime\n", "from datetime import datetime, timedelta\n", "\n", "sys.path.append(\"../\") # Add parent dir in the Path\n", "\n", "from hyperstream import HyperStream, StreamId\n", "from hyperstream import TimeInterval\n", "from hyperstream.utils import UTC\n", "\n", "from utils import plot_high_chart\n", "\n", "%watermark -v -m -p hyperstream -g" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Select the water Station\n", "\n", "For our example, we will query a water station called Bristol Avon Little Avon Axe and North Somerset St. This station has the station number 531118. It is possible to select another station by changing the station_number; a list of 50 other possible stations can be found following [this link][2].\n", "\n", "[2]: https://environment.data.gov.uk/flood-monitoring/id/stations?_limit=50" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "station_number = \"531118\"\n", "station_name = \"Bristol Avon Little Avon Axe and North Somerset St\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Tool and Stream\n", "\n", "First we will create a Stream to store the data and an instance of the new tool." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id \n" ] } ], "source": [ "hs = HyperStream(loglevel=20)\n", "print hs\n", "\n", "environment_stream = hs.channel_manager.memory.get_or_create_stream(\"environment\")\n", "environment_tool = hs.plugins.example.tools.environment_data_gov_uk(station=station_number)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Execute the tool\n", "\n", "Now we will specify an interval of time for which we want the water levels. In this particular case we will ask for the last 7 days. Then, we can execute the tool for the specified interval of time. The result will be stored in the specified Stream." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "now = datetime.utcnow().replace(tzinfo=UTC)\n", "before = (now - timedelta(weeks=1)).replace(tzinfo=UTC)\n", "ti = TimeInterval(before, now)\n", "\n", "environment_tool.execute(sources=[], sink=environment_stream, interval=ti)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualization\n", "\n", "Now we can visualize all the data stored in the stream" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " Bristol Avon Little Avon Axe and North Somerset St\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", "\n", " \n", "\n", " \n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "my_time, my_data = zip(*[(key.__str__(), value) for key, value in environment_stream.window().items()])\n", "\n", "plot_high_chart(my_time, my_data, type=\"high_stock\", title=station_name, yax='meters')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 1 }