{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"# HyperStream Tutorial 4: Real-time streams\n",
"\n",
"In this tutorial, we show how to create a new plugin that collects real-time data ussing a publicly available API. In this case, we use the [Environment Agency flood-monitoring API][1].\n",
"\n",
"## Creating a plugin tool to use the API\n",
"\n",
"### 1. Create a folder in plugins\n",
"\n",
"First of all, we need to create a new folder to contain the new tool. The new folder needs to be in the folder __plugins__, in this example __plugins/example/tools/environment_data_gov_uk/__. Also, we need to create an **\\__init\\__.py** file in every subfolder.\n",
"\n",
" plugins/\n",
" |- __init__.py\n",
" |- example/\n",
" |- __init__.py\n",
" |- tools/\n",
" |- __init__.py\n",
" |- environment_data_gov_uk\n",
" |- __init__.py\n",
" |- 2017-06-21_v0.0.1.py\n",
"\n",
"### 2. Write the plugin in Python\n",
"\n",
"As we have seen in a previous tutorial, we can create a new plugin in Python, in this case the code of the plugin **./plugins/example/tools/environment_data_gov_uk/2017-06-21_v0.0.1.py** uses the API to query only one of the water readings for the specified interval of time:\n",
"\n",
"```Python\n",
"from datetime import datetime\n",
"from datetime import datetime, timedelta\n",
"\n",
"from hyperstream import Tool, StreamInstance, StreamInstanceCollection\n",
"from hyperstream.utils import check_input_stream_count\n",
"from hyperstream.utils import UTC\n",
"\n",
"from dateutil.parser import parse\n",
"\n",
"import urllib\n",
"import urllib2\n",
"import json\n",
"\n",
"# this uses Environment Agency flood and river level data from the real-time\n",
"# data API (Beta)\n",
"# For questions on the APIs please contact data.info@environment-agency.gov.uk,\n",
"# a forum for announcements and discussion is under consideration.\n",
"class EnvironmentDataGovUk(Tool):\n",
" def __init__(self, station):\n",
" self.station = station\n",
" super(EnvironmentDataGovUk, self).__init__()\n",
"\n",
" @check_input_stream_count(0)\n",
" def _execute(self, sources, alignment_stream, interval):\n",
" startdate = interval[0].strftime(\"%Y-%m-%d\")\n",
" enddate = interval[1].strftime(\"%Y-%m-%d\")\n",
"\n",
" url = \"https://environment.data.gov.uk/flood-monitoring/id/stations/{}/readings\".format(self.station)\n",
" values = {'startdate' : startdate,\n",
" 'enddate' : enddate}\n",
" url_parameters = urllib.urlencode(values)\n",
"\n",
" full_url = url + '?' + url_parameters\n",
" response = urllib2.urlopen(full_url)\n",
" data = json.load(response)\n",
"\n",
" for item in data['items']:\n",
" dt = parse(item.get('dateTime'))\n",
" if dt in interval:\n",
" value = float(item.get('value'))\n",
" yield StreamInstance(dt, value)\n",
"\n",
"```\n",
"\n",
"### 3. Add HyperStream configuration\n",
"\n",
"Now, it is necessary to add information about this plugin into the **hyperstream_config.json**. In particular, we need to add the following information in the plugin section:\n",
"\n",
"- channel_id_prefix: This is to create Channels (explained in another tutorial).\n",
"- channel_names: A list of available Channels\n",
"- path: path to the new plugin\n",
"- has_tools: If the new plugin has tools\n",
"- has_assets: If it contains folders or files that are needed by the plugin\n",
"\n",
"Next, we have an example of an configuration file with the new plugin:\n",
"\n",
"```json\n",
"{\n",
" \"mongo\": {\n",
" \"host\": \"localhost\",\n",
" \"port\": 27017,\n",
" \"tz_aware\": true,\n",
" \"db\": \"hyperstream\"\n",
" },\n",
" \"plugins\": [{\n",
" \"channel_id_prefix\": \"example\",\n",
" \"channel_names\": [],\n",
" \"path\": \"plugins/example\",\n",
" \"has_tools\": true,\n",
" \"has_assets\": false\n",
" }],\n",
" \"online_engine\": {\n",
" \"interval\": {\n",
" \"start\": -60,\n",
" \"end\": -10\n",
" },\n",
" \"sleep\": 5,\n",
" \"iterations\": 100\n",
" }\n",
"}\n",
"\n",
"```\n",
"\n",
"\n",
"### Aknowledge\n",
"\n",
" this uses Environment Agency flood and river level data from the real-time data API (Beta)\n",
" \n",
"[1]: https://environment.data.gov.uk/flood-monitoring/doc/reference#introduction"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPython 2.7.6\n",
"IPython 5.3.0\n",
"\n",
"hyperstream 0.3.0-beta\n",
"\n",
"compiler : GCC 4.8.4\n",
"system : Linux\n",
"release : 3.19.0-80-generic\n",
"machine : x86_64\n",
"processor : x86_64\n",
"CPU cores : 4\n",
"interpreter: 64bit\n",
"Git hash : f0e911526041b91fe7999a8968c80618d410e741\n"
]
}
],
"source": [
"%load_ext watermark\n",
"\n",
"import sys\n",
"from datetime import datetime\n",
"from datetime import datetime, timedelta\n",
"\n",
"sys.path.append(\"../\") # Add parent dir in the Path\n",
"\n",
"from hyperstream import HyperStream, StreamId\n",
"from hyperstream import TimeInterval\n",
"from hyperstream.utils import UTC\n",
"\n",
"from utils import plot_high_chart\n",
"\n",
"%watermark -v -m -p hyperstream -g"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Select the water Station\n",
"\n",
"For our example, we will query a water station called Bristol Avon Little Avon Axe and North Somerset St. This station has the station number 531118. It is possible to select another station by changing the station_number; a list of 50 other possible stations can be found following [this link][2].\n",
"\n",
"[2]: https://environment.data.gov.uk/flood-monitoring/id/stations?_limit=50"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"station_number = \"531118\"\n",
"station_name = \"Bristol Avon Little Avon Axe and North Somerset St\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Tool and Stream\n",
"\n",
"First we will create a Stream to store the data and an instance of the new tool."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id \n"
]
}
],
"source": [
"hs = HyperStream(loglevel=20)\n",
"print hs\n",
"\n",
"environment_stream = hs.channel_manager.memory.get_or_create_stream(\"environment\")\n",
"environment_tool = hs.plugins.example.tools.environment_data_gov_uk(station=station_number)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Execute the tool\n",
"\n",
"Now we will specify an interval of time for which we want the water levels. In this particular case we will ask for the last 7 days. Then, we can execute the tool for the specified interval of time. The result will be stored in the specified Stream."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"now = datetime.utcnow().replace(tzinfo=UTC)\n",
"before = (now - timedelta(weeks=1)).replace(tzinfo=UTC)\n",
"ti = TimeInterval(before, now)\n",
"\n",
"environment_tool.execute(sources=[], sink=environment_stream, interval=ti)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Visualization\n",
"\n",
"Now we can visualize all the data stored in the stream"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
" \n",
" \n",
" Bristol Avon Little Avon Axe and North Somerset St\n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
"\n",
" \n",
"\n",
" \n",
"\n",
" \n",
" \n",
" "
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"my_time, my_data = zip(*[(key.__str__(), value) for key, value in environment_stream.window().items()])\n",
"\n",
"plot_high_chart(my_time, my_data, type=\"high_stock\", title=station_name, yax='meters')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 1
}