{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"# HyperStream Tutorial 3: Stream composition\n",
"\n",
"We will be ussing the tool created in the previous tutorial and we will compose the output of the stream with a new one."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPython 2.7.6\n",
"IPython 5.3.0\n",
"\n",
"hyperstream 0.3.0-beta\n",
"\n",
"compiler : GCC 4.8.4\n",
"system : Linux\n",
"release : 3.19.0-80-generic\n",
"machine : x86_64\n",
"processor : x86_64\n",
"CPU cores : 4\n",
"interpreter: 64bit\n",
"Git hash : f0e911526041b91fe7999a8968c80618d410e741\n"
]
}
],
"source": [
"%load_ext watermark\n",
"\n",
"import sys\n",
"from datetime import datetime\n",
"\n",
"sys.path.append(\"../\") # Add parent dir in the Path\n",
"\n",
"from hyperstream import HyperStream\n",
"from hyperstream import TimeInterval\n",
"from hyperstream.utils import UTC\n",
"\n",
"from utils import plot_high_chart\n",
"\n",
"%watermark -v -m -p hyperstream -g"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id \n",
"[1990-02-01 00:00:00+00:00]: [13.33, 2.15]\n",
"[1990-03-01 00:00:00+00:00]: [13.44, 2.71]\n",
"[1990-04-01 00:00:00+00:00]: [12.16, 5.1]\n",
"[1990-05-01 00:00:00+00:00]: [10.84, 7.37]\n",
"[1990-06-01 00:00:00+00:00]: [9.12, 10.26]\n",
"[1990-07-01 00:00:00+00:00]: [6.44, 12.17]\n",
"[1990-08-01 00:00:00+00:00]: [4.92, 13.95]\n",
"[1990-09-01 00:00:00+00:00]: [4.5, 14.3]\n",
"[1990-10-01 00:00:00+00:00]: [6.67, 13.71]\n",
"[1990-11-01 00:00:00+00:00]: [9.58, 11.24]\n"
]
}
],
"source": [
"hs = HyperStream(loglevel=20)\n",
"print hs\n",
"\n",
"reader_tool = hs.plugins.example.tools.csv_reader('data/sea_ice.csv')\n",
"sea_ice_stream = hs.channel_manager.memory.get_or_create_stream(\"sea_ice\")\n",
"\n",
"\n",
"ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2012, 1, 1).replace(tzinfo=UTC))\n",
"\n",
"reader_tool.execute(sources=[], sink=sea_ice_stream, interval=ti)\n",
"for key, value in sea_ice_stream.window().items()[:10]:\n",
" print '[%s]: %s' % (key, value)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Stream composition\n",
"\n",
"We can compose a chain of streams using different tools to get a new stream. As an example, we can use the tool **read_csv** to generate a stream from a csv file. Then, we can apply the tool **list_mean**, that computes the mean of all the values of each instance of a stream, and outputs a new stream. Finally, we can define the new stream to store the output in **memory** or in a **MongoDB** database. In this case, we will store the final Stream in the MongoDB database.\n",
"\n",
"|~stream||tool||stream||tool||stream|\n",
"|:-:|:-:|:-:|:-:|:-:|:-:|:-:|:-:|:-:|\n",
"| csv_file | $\\rightarrow$ | reader_tool | $\\rightarrow$ | sea_ice_stream | $\\rightarrow$ | list_mean_tool | $\\rightarrow$ | sea_ice_mean_stream |\n",
"|filesystem||memory||memory||memory||MongoDB|"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1990-02-01 00:00:00+00:00]: 7.74\n",
"[1990-03-01 00:00:00+00:00]: 8.075\n",
"[1990-04-01 00:00:00+00:00]: 8.63\n",
"[1990-05-01 00:00:00+00:00]: 9.105\n",
"[1990-06-01 00:00:00+00:00]: 9.69\n",
"[1990-07-01 00:00:00+00:00]: 9.305\n",
"[1990-08-01 00:00:00+00:00]: 9.435\n",
"[1990-09-01 00:00:00+00:00]: 9.4\n",
"[1990-10-01 00:00:00+00:00]: 10.19\n",
"[1990-11-01 00:00:00+00:00]: 10.41\n"
]
}
],
"source": [
"list_mean_tool = hs.tools.list_mean()\n",
"\n",
"sea_ice_means_stream = hs.channel_manager.mongo.get_or_create_stream('sea_ice_means')\n",
"list_mean_tool.execute(sources=[sea_ice_stream], sink=sea_ice_means_stream, interval=ti)\n",
"\n",
"for key, value in sea_ice_means_stream.window().items()[:10]:\n",
" print '[%s]: %s' % (key, value)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Visualization\n",
"\n",
"We can now plot all the values of the last computed window. In this case there is only one window with all the data computed by the tool."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
" \n",
" \n",
" Mean of sea levels in the Artic and the Antartica\n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
"\n",
" \n",
"\n",
" \n",
"\n",
" \n",
" \n",
" "
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"my_time, my_data = zip(*[(key.__str__(), value) for key, value in sea_ice_means_stream.window().items()])\n",
"\n",
"plot_high_chart(my_time, my_data, type=\"high_stock\", \n",
" title='Mean of sea levels in the Artic and the Antartica', yax='meters')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 1
}