{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# What borough has the fastest NYC taxi drivers?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from __future__ import division" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%reload_ext autotime" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from blaze import by, compute, transform, Data, sin, cos, atan2, sqrt, radians, summary, greatest, symbol\n", "from odo import odo, drop, resource, Temp, CSV, S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "connect_args = dict(sslmode='verify-ca')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "d = Data('redshift://cio@localhost:15439/dev::trip', connect_args=connect_args)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "d.count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def haversine_distance(start, stop, R=3959):\n", " \"\"\"Compute the distance between two sets of `start` and `stop` lat, lon points\n", " \"\"\"\n", " # http://andrew.hedges.name/experiments/haversine/\n", " lat1, lon1 = start\n", " lat2, lon2 = stop\n", " dlon = lon2 - lon1\n", " dlat = lat2 - lat1\n", " a = sin(radians(dlat) / 2.0) ** 2 + cos(lat1) * cos(lat2) * sin(radians(dlon) / 2.0) ** 2\n", " return R * 2 * atan2(sqrt(greatest(a, 0.0)), sqrt(greatest(1.0 - a, 0.0)))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "max_distance = haversine_distance(start=[40.477399, -74.259090], stop=[40.917577, -73.700272])\n", "max_distance" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reduce the data to the area defined as NYC" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# http://hafen.github.io/taxi/#reading-in-to-r\n", "\n", "min_lat, max_lat = 40.477399, 40.917577\n", "min_lon, max_lon = -74.259090, -73.700272\n", "\n", "valid = d[\n", " (d.trip_distance > 0) &\n", " (d.trip_distance <= max_distance) &\n", " (d.trip_time_in_secs > 0) &\n", " (d.passenger_count > 0) & (d.passenger_count <= 5) &\n", " (d.pickup_latitude >= min_lat) & (d.pickup_latitude <= max_lat) &\n", " (d.dropoff_latitude >= min_lat) & (d.dropoff_latitude <= max_lat) &\n", " (d.pickup_longitude >= min_lon) & (d.pickup_longitude <= max_lon) &\n", " (d.dropoff_longitude >= min_lon) & (d.dropoff_longitude <= max_lon)\n", "]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "valid" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "computed_distance = haversine_distance(start=[valid.pickup_latitude, valid.pickup_longitude],\n", " stop=[valid.dropoff_latitude, valid.dropoff_longitude])\n", "trip_time_in_hours = valid.trip_time_in_secs.coerce('float64') / 3600.0\n", "calcd = transform(valid,\n", " avg_speed_in_mph=valid.trip_distance / trip_time_in_hours,\n", " trip_time_in_hours=trip_time_in_hours)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "calcd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Assume that > 120 MPH is invalid" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "data = calcd[(calcd.avg_speed_in_mph > 5) & (calcd.avg_speed_in_mph <= 120)][\n", " [\n", " 'avg_speed_in_mph', \n", " 'pickup_latitude', \n", " 'pickup_longitude', \n", " 'dropoff_latitude', \n", " 'dropoff_longitude'\n", " ]\n", "]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(compute(data))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A small aside, with `odo`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Let's look at the distribution of rides by borough\n", "\n", "#### Q: Where do we get borough geolocation data?\n", "#### A: From NYC's own API!" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import requests as r" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "resp = r.get(\n", " 'https://data.cityofnewyork.us/api/geospatial/tqmj-j8zm',\n", " params=dict(method='export', format='KML')\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import lxml\n", "import lxml.etree" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Parse the KML with `lxml`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "kml = lxml.etree.fromstring(resp.text.encode('utf8'))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(resp.text)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### `Placemark` elements delineate boroughs" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "placemarks = kml.xpath(\"//*[local-name()='Placemark']\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create some functions to pull out the borough names and points" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from toolz.curried import map\n", "from toolz import pipe, concat\n", "\n", "import io\n", "\n", "\n", "def find_borough_name(text):\n", " return re.search(r'.*(Brooklyn|Manhattan|Bronx|Staten Island|Queens).*', text).group(1)\n", "\n", "\n", "def parse_coords(p, name):\n", " coords = p.xpath('.//*[local-name()=\"coordinates\"]/text()')\n", " return pipe(coords,\n", " map(str.split),\n", " map(lambda x: '\\n'.join(map(lambda y: '%s,%s' % (y, name), x))),\n", " '\\n'.join)\n", "\n", "\n", "def parse_kml(placemarks):\n", " result = []\n", " \n", " for p in placemarks:\n", " desc, = p.xpath(\".//*[local-name()='description']\")\n", " name = find_borough_name(desc.text) \n", " result.append(parse_coords(p, name))\n", " return '\\n'.join(result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Parse the location data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "geodata = parse_kml(placemarks)\n", "geodata[:geodata.find('\\n')]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Inspect our data with pandas" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "df = pd.read_csv(io.StringIO(geodata), names=['lon', 'lat', 'name']).drop_duplicates().reset_index(drop=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "df.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### After we're satisfied, let's throw everything into redshift for later analysis" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "boros = odo(\n", " odo(df, Temp(S3(CSV))),\n", " 'redshift://cio@localhost:15439/dev::boros',\n", " connect_args=connect_args\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "len(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Compute the bounding box for each borough" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "bounds = sa.select([\n", " boros.c.name,\n", " sa.func.min(boros.c.lon).label('min_lon'),\n", " sa.func.min(boros.c.lat).label('min_lat'),\n", " sa.func.max(boros.c.lon).label('max_lon'),\n", " sa.func.max(boros.c.lat).label('max_lat')\n", "]).group_by(boros.c.name).alias()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(bounds)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get out the SQLAlchemy table when blaze isn't enough" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "nyc = compute(data).alias()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "joined = nyc.join(\n", " bounds,\n", " onclause=(\n", " (nyc.c.pickup_longitude >= bounds.c.min_lon) &\n", " (nyc.c.pickup_longitude <= bounds.c.max_lon) &\n", " (nyc.c.pickup_latitude >= bounds.c.min_lat) &\n", " (nyc.c.pickup_latitude <= bounds.c.max_lat) &\n", " (nyc.c.dropoff_longitude >= bounds.c.min_lon) &\n", " (nyc.c.dropoff_longitude <= bounds.c.max_lon) &\n", " (nyc.c.dropoff_latitude >= bounds.c.min_lat) &\n", " (nyc.c.dropoff_latitude <= bounds.c.max_lat)\n", " )\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "sel = sa.select([\n", " bounds.c.name,\n", " nyc.c.avg_speed_in_mph, \n", " nyc.c.pickup_latitude,\n", " nyc.c.pickup_longitude,\n", " nyc.c.dropoff_latitude,\n", " nyc.c.dropoff_longitude\n", "]).select_from(joined).alias()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "per_boro_speeds = sa.select([\n", " sel.c.name,\n", " sa.func.avg(sel.c.avg_speed_in_mph).label('avg_speed'),\n", " sa.func.count(sel.c.avg_speed_in_mph).label('nsamples')\n", "]).group_by(sel.c.name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import sqlparse" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(sqlparse.format(str(per_boro_speeds), reindent=True))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "speeds = odo(\n", " per_boro_speeds.order_by(sa.desc(per_boro_speeds.c.avg_speed)),\n", " pd.DataFrame,\n", " connect_args=connect_args\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "speeds" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.4.3" } }, "nbformat": 4, "nbformat_minor": 0 }