{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"# HyperStream Tutorial 1: Introduction\n",
"\n",
"## Requirements\n",
"\n",
"In order to run this and the following tutorials, it is necessary to have access to a MongoDB server running in the **localhost port 27017**. It is possible to change the host and port of the MongoDB server by modifying the configuration file __hyperstream_config.json__ located in the folder of this notebook.\n",
"\n",
"We also require all the dependencies listed in the HyperStream requirements, the installation instructions can be found in https://github.com/IRC-SPHERE/HyperStream\n",
"\n",
"It is possible to start a MongoDB server, a Jupyter Notebook and a MQTT server by running the script **start_tutorial.sh**"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPython 2.7.6\n",
"IPython 5.3.0\n",
"\n",
"hyperstream 0.3.0-beta\n",
"\n",
"compiler : GCC 4.8.4\n",
"system : Linux\n",
"release : 3.19.0-80-generic\n",
"machine : x86_64\n",
"processor : x86_64\n",
"CPU cores : 4\n",
"interpreter: 64bit\n",
"Git hash : f0e911526041b91fe7999a8968c80618d410e741\n"
]
}
],
"source": [
"%load_ext watermark\n",
"\n",
"import sys\n",
"sys.path.append(\"../\") # Add parent dir in the Path\n",
"\n",
"from hyperstream import HyperStream\n",
"from hyperstream import StreamId\n",
"from hyperstream import TimeInterval\n",
"\n",
"from pytz import UTC\n",
"from datetime import datetime, timedelta\n",
"\n",
"from utils import plot_high_chart\n",
"from utils import unix_time_miliseconds\n",
"\n",
"%watermark -v -m -p hyperstream -g"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Starting a Hyperstream instance\n",
"\n",
"First of all, we will create a HyperStream instance. This instance will connect to the MongoDB server that is specified in the configuration file **hyperstream_config.json**."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id \n"
]
}
],
"source": [
"hs = HyperStream(loglevel=0)\n",
"print hs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Selecting a tool\n",
"\n",
"HyperStream comes with a set of predefined tools in hyperstream.tools. These tools can be used to define the nodes of a factor graph that will produce values or compute certain functions given the specified input nodes. For this tutorial, we will focus on the **clock** tool. This tool produces time ticks given an initial time and a stride in seconds for each following tick. In this case we will generate one tick every hour."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"clock_tool = hs.tools.clock(stride=1.0*60*60)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Storing the result in a Stream\n",
"\n",
"We need to specify where we want to store the data that will be generated. It is possible to store the Stream in memory or in a MongoDB database. In this tutorial we use the memory channel by creating an instance of memory and then creating the stream in this channel."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"M = hs.channel_manager.memory\n",
"# M = hs.channel_manager.mongo # To store the results in a MongoDB database\n",
"\n",
"clock_stream = M.get_or_create_stream(stream_id=StreamId(name=\"clock_stream\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Definition of a time interval\n",
"\n",
"Now we can specify a time interval in which we want to execute the tool. In this example we will ask for an interval of 10 hours from yesterday."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"yesterday_start = (datetime.utcnow() - timedelta(days=1)).replace(tzinfo=UTC)\n",
"yesterday_end = (yesterday_start + timedelta(hours=10)).replace(tzinfo=UTC)\n",
"\n",
"ti = TimeInterval(yesterday_start, yesterday_end)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Executing the tool\n",
"\n",
"Now that we defined the tool to use, where we want to store the results and the time interval, it is possible to execute the tool. Then, we will have all the computed results in the Stream specified as a __sink__."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(2017-07-20 16:00:37.904174+00:00, 2017-07-21 02:00:37.904174+00:00]\n"
]
}
],
"source": [
"clock_tool.execute(sources=[], sink=clock_stream, interval=ti, alignment_stream=None)\n",
"\n",
"print clock_stream.calculated_intervals"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Printing the results\n",
"\n",
"The resulting stream is stored in the ticker. We can get now a list of tuples containing the timestamps and their corresponding clock values."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00\n",
"[2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00\n",
"[2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00\n",
"[2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00\n",
"[2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00\n",
"[2017-07-20 22:00:00+00:00]: 2017-07-20 22:00:00+00:00\n",
"[2017-07-20 23:00:00+00:00]: 2017-07-20 23:00:00+00:00\n",
"[2017-07-21 00:00:00+00:00]: 2017-07-21 00:00:00+00:00\n",
"[2017-07-21 01:00:00+00:00]: 2017-07-21 01:00:00+00:00\n",
"[2017-07-21 02:00:00+00:00]: 2017-07-21 02:00:00+00:00\n"
]
}
],
"source": [
"for timestamp, value in clock_stream.window().items():\n",
" print '[%s]: %s' % (timestamp, value)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Query the Stream\n",
"\n",
"We can query the stream with any particular time interval. For example, we can ask for the first 5 hours of the precomputed starting time."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00\n",
"[2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00\n",
"[2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00\n",
"[2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00\n",
"[2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00\n"
]
}
],
"source": [
"ti = TimeInterval(yesterday_start, (yesterday_start + timedelta(hours=5)).replace(tzinfo=UTC))\n",
"\n",
"for timestamp, value in clock_stream.window(ti).items():\n",
" print '[%s]: %s' % (timestamp, value)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Executing a new interval\n",
"\n",
"It is possible to execute the tool again specifying a different interval. Because the interval has never been computed in the specified Stream, the tool will compute the new data. If the interval was already computed the tool would not do any computation. As an example, we will execute the tool for the past 5 hours."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(2017-07-20 16:00:37.904174+00:00, 2017-07-21 02:00:37.904174+00:00] U (2017-07-21 11:00:37.945814+00:00, 2017-07-21 16:00:37.945814+00:00]\n"
]
}
],
"source": [
"today_end = datetime.utcnow().replace(tzinfo=UTC)\n",
"today_start = (today_end - timedelta(hours=5)).replace(tzinfo=UTC)\n",
"\n",
"ti = TimeInterval(today_start, today_end)\n",
"\n",
"clock_tool.execute(sources=[], sink=clock_stream, interval=ti, alignment_stream=None)\n",
"print clock_stream.calculated_intervals"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we can see that we got two different intervals of time in the full Stream"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00\n",
"[2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00\n",
"[2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00\n",
"[2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00\n",
"[2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00\n",
"[2017-07-20 22:00:00+00:00]: 2017-07-20 22:00:00+00:00\n",
"[2017-07-20 23:00:00+00:00]: 2017-07-20 23:00:00+00:00\n",
"[2017-07-21 00:00:00+00:00]: 2017-07-21 00:00:00+00:00\n",
"[2017-07-21 01:00:00+00:00]: 2017-07-21 01:00:00+00:00\n",
"[2017-07-21 02:00:00+00:00]: 2017-07-21 02:00:00+00:00\n",
"[2017-07-21 12:00:00+00:00]: 2017-07-21 12:00:00+00:00\n",
"[2017-07-21 13:00:00+00:00]: 2017-07-21 13:00:00+00:00\n",
"[2017-07-21 14:00:00+00:00]: 2017-07-21 14:00:00+00:00\n",
"[2017-07-21 15:00:00+00:00]: 2017-07-21 15:00:00+00:00\n",
"[2017-07-21 16:00:00+00:00]: 2017-07-21 16:00:00+00:00\n"
]
}
],
"source": [
"ti = TimeInterval(yesterday_start, today_end)\n",
"\n",
"for timestamp, value in clock_stream.window(ti).items():\n",
" print '[%s]: %s' % (timestamp, value)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Visualization\n",
"\n",
"For this and the following tutorials we will use the JavaScript library [highcharts][1]. We have created a Python function called **plot_high_chart** for one time-serie or line and **plot_multiple_stock** for multiple time-series.\n",
"\n",
"Then, we can visualize the two intervals where the tool has been executed.\n",
"\n",
"Be aware that there is a missing time interval in the graph and the highchart jumps from one point to the next one ignoring a linear time.\n",
"\n",
"[1]: https://www.highcharts.com/"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
" \n",
" \n",
" Time from epoch\n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
"\n",
" \n",
"\n",
" \n",
"\n",
" \n",
" \n",
" "
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"my_time, my_data = zip(*[(key.__str__(), unix_time_miliseconds(value)) for key, value in clock_stream.window(ti).items()])\n",
"\n",
"plot_high_chart(my_time, my_data, yax='miliseconds', title='Time from epoch', type='high_stock')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 1
}