{ "metadata": { "name": "", "signature": "sha256:90b05075a21795c44229ffba4a49fb50e4927cbed7e5807fca24165b465c3237" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "code", "collapsed": false, "input": [ "%pylab inline" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Populating the interactive namespace from numpy and matplotlib\n" ] } ], "prompt_number": 1 }, { "cell_type": "heading", "level": 1, "metadata": {}, "source": [ "Call and SMS data aggretation" ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Imports" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from pandas import *" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 2 }, { "cell_type": "code", "collapsed": false, "input": [ "from pandas.io.pytables import HDFStore\n", "from pandas.tseries.index import to_offset" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 3 }, { "cell_type": "code", "collapsed": false, "input": [ "import dateutil" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 4 }, { "cell_type": "code", "collapsed": false, "input": [ "from joblib import Parallel, delayed" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 5 }, { "cell_type": "code", "collapsed": false, "input": [ "import time" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 6 }, { "cell_type": "code", "collapsed": false, "input": [ "from datetime import timedelta" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 7 }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Aggregation on time span" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def process_period(day, delta, span='1D'):\n", " \"\"\"Process a time slice [day, day+delta) and aggregate on time span.\n", " \"\"\"\n", " \n", " # load time slice from hdf5 (creted by hdf5_dataset.ipynb)\n", " store = HDFStore('stores/sms-call-internet-mi-table-blosc.h5')\n", " intensity_data = store.select('telco_data', \"index >= Timestamp('%s') & index < Timestamp('%s')\" % (day, day + delta))\n", " intensity_data = intensity_data.fillna(0)\n", " \n", " # groupby 'groups' and aggregate data in 'columns'\n", " groups = [\"Square_id\", 'Country_code']\n", " columns = ['SMS_in', 'SMS_out', 'Call_in', 'Call_out']\n", " \n", " # TODO: resample()?\n", " result = intensity_data.groupby(TimeGrouper(span)).apply(lambda x: x.groupby(groups)[columns].sum()) \n", " \n", " store.close()\n", " \n", " return result" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 8 }, { "cell_type": "code", "collapsed": false, "input": [ "start_date = datetime(2013,11,1)\n", "end_date = datetime(2013,12,31)\n", "interval_freq = '1D'\n", "\n", "interval = to_offset(interval_freq).delta\n", "days = date_range(start_date, end_date, freq=interval_freq, tz='Europe/Rome')" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 9 }, { "cell_type": "code", "collapsed": false, "input": [ "print datetime.now()\n", "# NOTE: must be <= interval_freq\n", "time_span = '1D'\n", "days_result = Parallel(n_jobs=8, verbose=5)(delayed(process_period)(day, interval, time_span) for day in days)\n", "print datetime.now()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "2015-01-25 17:58:26.919533\n" ] }, { "output_type": "stream", "stream": "stderr", "text": [ "[Parallel(n_jobs=8)]: Done 1 out of 61 | elapsed: 10.5s remaining: 10.5min\n", "[Parallel(n_jobs=8)]: Done 11 out of 61 | elapsed: 23.3s remaining: 1.8min\n", "[Parallel(n_jobs=8)]: Done 24 out of 61 | elapsed: 35.3s remaining: 54.4s\n", "[Parallel(n_jobs=8)]: Done 37 out of 61 | elapsed: 57.7s remaining: 37.4s\n", "[Parallel(n_jobs=8)]: Done 50 out of 61 | elapsed: 1.3min remaining: 17.0s\n", "[Parallel(n_jobs=8)]: Done 61 out of 61 | elapsed: 1.4min finished\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "2015-01-25 17:59:51.246540\n" ] } ], "prompt_number": 10 }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Store result" ] }, { "cell_type": "code", "collapsed": false, "input": [ "# NOTE: common store for the pipeline\n", "store = HDFStore('./stores/aggregated_dataset.h5') # test\n", "\n", "if 'intensity_' + time_span in store:\n", " raise Exception('refusing to overwrite')\n", "\n", "for result in days_result:\n", " store.append('intensity_' + time_span, result)\n", "\n", "store.flush()\n", "store.close()" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 11 } ], "metadata": {} } ] }