{ "metadata": { "gist_id": "b5342eea9ff20b79c5c7", "name": "", "signature": "sha256:e798cf5777d300cbeafbf9161efc637458788f88540772f52f7bd645306acc59" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "heading", "level": 1, "metadata": {}, "source": [ "Parallel Processing Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Email me: email.ryan.kelly@gmail.com" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import numpy.random as npr\n", "import numpy as np\n", "import pandas as pd\n", "from pandas import DataFrame, date_range\n", "\n", "from datetime import datetime, timedelta" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 6 }, { "cell_type": "code", "collapsed": false, "input": [ "np.random.seed(111)\n", "\n", "# Function to generate test data\n", "def CreateDataSet(Number=1):\n", " \n", " Output = []\n", " \n", " for i in range(Number):\n", " \n", " # Create a date range with hour frequency\n", " date = date_range(start='10/1/2012', end='10/31/2012', freq='H')\n", " \n", " # Create long lat data\n", " laty = npr.normal(4815862, 5000,size=len(date))\n", " longx = npr.normal(687993, 5000,size=len(date))\n", " \n", " # status of interest\n", " status = [0,1]\n", " \n", " # Make a random list of statuses\n", " random_status = [status[npr.randint(low=0,high=len(status))] for i in range(len(date))]\n", " \n", " # user pool\n", " user = ['sally','derik','james','bob','ryan','chris']\n", " \n", " # Make a random list of users \n", " random_user = [user[npr.randint(low=0,high=len(user))] for i in range(len(date))]\n", " \n", " Output.extend(zip(random_user, random_status, date, longx, laty))\n", " \n", " return pd.DataFrame(Output, columns = ['user', 'status', 'date', 'long', 'lat'])" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 7 }, { "cell_type": "code", "collapsed": false, "input": [ "data = CreateDataSet(20)\n", "data.head()" ], "language": "python", "metadata": {}, "outputs": [ { "html": [ "
0 ryan 02012-10-01 00:00:00 692823.716714 4810192.808328
1 ryan 12012-10-01 01:00:00 679549.965772 4817783.595967
2 bob 02012-10-01 02:00:00 686339.324152 4823344.768882
3 ryan 02012-10-01 03:00:00 677609.798732 4814085.088514
4 sally 12012-10-01 04:00:00 689556.379975 4811924.332295
" ], "metadata": {}, "output_type": "pyout", "prompt_number": 8, "text": [ " user status date long lat\n", "0 ryan 0 2012-10-01 00:00:00 692823.716714 4810192.808328\n", "1 ryan 1 2012-10-01 01:00:00 679549.965772 4817783.595967\n", "2 bob 0 2012-10-01 02:00:00 686339.324152 4823344.768882\n", "3 ryan 0 2012-10-01 03:00:00 677609.798732 4814085.088514\n", "4 sally 1 2012-10-01 04:00:00 689556.379975 4811924.332295\n", "\n", "[5 rows x 5 columns]" ] } ], "prompt_number": 8 }, { "cell_type": "code", "collapsed": false, "input": [ "#some time deltas\n", "before = timedelta(hours = 8)\n", "after = timedelta(minutes = 1)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 167 }, { "cell_type": "code", "collapsed": false, "input": [ "from IPython.parallel import Client\n", "cli = Client()\n", "cli.ids\n", "\n", "cli = Client()\n", "#cli[:].apply_sync(get_pid)\n", "dview=cli[:]\n", "\n", "with dview.sync_imports():\n", " import numpy as np\n", " import os\n", " from datetime import timedelta\n", " import pandas as pd" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "importing numpy on engine(s)\n", "importing os on engine(s)\n", "importing timedelta from datetime on engine(s)\n", "importing pandas on engine(s)\n" ] } ], "prompt_number": 168 }, { "cell_type": "code", "collapsed": false, "input": [ "@dview.parallel(block=True)\n", "def work(df):\n", " before = timedelta(hours = 8)\n", " after = timedelta(minutes = 1)\n", " output = []\n", " #loop through data index's\n", " for i in range(0, len(df)):\n", " l = []\n", " #first we will filter out the data by date to have a smaller list to compute distances for\n", " \n", " #create a mask to query all dates between range for date i\n", " date_mask = (df['date'] >= df['date'].iloc[i]-before) & (df['date'] <= df['date'].iloc[i]+after)\n", " #create a mask to query all users who are not user i (themselves)\n", " user_mask = df['user']!=df['user'].iloc[i]\n", " #apply masks\n", " dists_to_check = df[date_mask & user_mask]\n", " \n", " #for point i, create coordinate to calculate distances from\n", " a = numpy.array((df['long'].iloc[i], df['lat'].iloc[i]))\n", " #create array of distances to check on the masked data\n", " b = numpy.array((dists_to_check['long'].values, dists_to_check['lat'].values))\n", " \n", " #for j in the date queried data\n", " for j in range(1, len(dists_to_check)):\n", " #compute the ueclidean distance between point a and each point of b (the date masked data)\n", " x = numpy.linalg.norm(a-numpy.array((b[0][j], b[1][j])))\n", " \n", " #if the distance is within our range of interest append the index to a list\n", " if x <=1000:\n", " l.append(j)\n", " else:\n", " pass\n", " try:\n", " #use the list of desired index's 'l' to query a final subset of the data\n", " data = dists_to_check.iloc[l]\n", " #summarize the column of interest then append to output list\n", " output.append(data['status'].sum())\n", " except IndexError, e:\n", " pass\n", " output.append(0)\n", " #print \"There were no data to add\"\n", " \n", " return pandas.DataFrame(output)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 194 }, { "cell_type": "code", "collapsed": false, "input": [ "start = datetime.now()\n", "out = work(data)\n", "print datetime.now() - start" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "in sync results \n", "0:00:08.475609" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n" ] } ], "prompt_number": 195 }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 0 }, { "cell_type": "code", "collapsed": false, "input": [ "c = np.array((data['long'].values, data['lat'].values))\n", "a = np.array((data['long'].iloc[0], data['lat'].iloc[0]))\n", "\n", "def t():\n", " l = []\n", " for j in range(1, 2000):\n", " x = np.linalg.norm(a-np.array((b[0][j], b[1][j])))\n", " \n", " if x <=1000:\n", " l.append(j)\n", " else:\n", " pass" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 124 }, { "cell_type": "code", "collapsed": false, "input": [ "%timeit t" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "10000000 loops, best of 3: 30.4 ns per loop\n" ] } ], "prompt_number": 125 } ], "metadata": {} } ] }