{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import dask.dataframe as dd\n", "import dask.distributed\n", "import numpy as np" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [], "source": [ "client = dask.distributed.Client()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "Index(['dropoff_datetime', 'dropoff_latitude', 'dropoff_longitude',\n", " 'dropoff_taxizone_id', 'ehail_fee', 'extra', 'fare_amount',\n", " 'improvement_surcharge', 'mta_tax', 'passenger_count', 'payment_type',\n", " 'pickup_latitude', 'pickup_longitude', 'pickup_taxizone_id',\n", " 'rate_code_id', 'store_and_fwd_flag', 'tip_amount', 'tolls_amount',\n", " 'total_amount', 'trip_distance', 'trip_type', 'vendor_id', 'trip_id'],\n", " dtype='object')" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Print column names\n", "\n", "df0 = dd.read_parquet('/data/all_trips.parquet', engine='fastparquet', index='pickup_datetime')\n", "df0.columns" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Load only the columns we need, large speedup.\n", "\n", "df = dd.read_parquet('/data/all_trips_spark.parquet', engine='arrow', \n", " columns=[\n", "# 'pickup_datetime', \n", " 'pickup_longitude', 'pickup_latitude', #'pickup_taxizone_id',\n", "# 'dropoff_datetime', \n", " 'dropoff_longitude', 'dropoff_latitude', #'dropoff_taxizone_id',\n", "# 'trip_type', \n", "# 'passenger_count'\n", " 'total_amount'\n", " ])\n" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
pickup_longitudepickup_latitudedropoff_longitudedropoff_latitudetotal_amount
0-73.96591940.771244-73.94960840.7770585.800000
1-73.99748240.725952-74.00593640.7357035.400000
2-73.96479840.767391-73.97775340.7737465.800000
3-74.01159740.708832-74.01346640.7093584.600000
4-74.00064840.718578-73.94458040.71236827.799999
\n", "
" ], "text/plain": [ " pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude \\\n", "0 -73.965919 40.771244 -73.949608 40.777058 \n", "1 -73.997482 40.725952 -74.005936 40.735703 \n", "2 -73.964798 40.767391 -73.977753 40.773746 \n", "3 -74.011597 40.708832 -74.013466 40.709358 \n", "4 -74.000648 40.718578 -73.944580 40.712368 \n", "\n", " total_amount \n", "0 5.800000 \n", "1 5.400000 \n", "2 5.800000 \n", "3 4.600000 \n", "4 27.799999 " ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
pickup_longitudepickup_latitudedropoff_longitudedropoff_latitudetotal_amount
1432504NaNNaNNaNNaN8.300000
1432505NaNNaNNaNNaN17.299999
1432506NaNNaNNaNNaN41.759998
1432507NaNNaNNaNNaN6.300000
1432508NaNNaNNaNNaN14.300000
\n", "
" ], "text/plain": [ " pickup_longitude pickup_latitude dropoff_longitude \\\n", "1432504 NaN NaN NaN \n", "1432505 NaN NaN NaN \n", "1432506 NaN NaN NaN \n", "1432507 NaN NaN NaN \n", "1432508 NaN NaN NaN \n", "\n", " dropoff_latitude total_amount \n", "1432504 NaN 8.300000 \n", "1432505 NaN 17.299999 \n", "1432506 NaN 41.759998 \n", "1432507 NaN 6.300000 \n", "1432508 NaN 14.300000 " ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.tail()" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Select only those points within some reasonable bounds (half a degree)\n", "\n", "df = df[df.pickup_latitude.notnull() & df.pickup_longitude.notnull() \n", " & ((df.pickup_latitude - 40.75).abs() <= 0.5) \n", " & ((df.pickup_longitude + 73.9).abs() <= 0.5)\n", " ]\n", "df = df[df.dropoff_latitude.notnull() & df.dropoff_longitude.notnull() \n", " & ((df.dropoff_latitude - 40.75).abs() <= 0.5) \n", " & ((df.dropoff_longitude + 73.9).abs() <= 0.5)\n", " ]" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "pickup_longitude 1268170371\n", "pickup_latitude 1268170371\n", "dropoff_longitude 1268170371\n", "dropoff_latitude 1268170371\n", "total_amount 1268170371\n", "dtype: int64" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# We get about 1.27 billion points\n", "df.count().compute()" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
pickup_longitudepickup_latitudedropoff_longitudedropoff_latitudetotal_amount
0-73.96591940.771244-73.94960840.7770585.800000
1-73.99748240.725952-74.00593640.7357035.400000
2-73.96479840.767391-73.97775340.7737465.800000
3-74.01159740.708832-74.01346640.7093584.600000
4-74.00064840.718578-73.94458040.71236827.799999
\n", "
" ], "text/plain": [ " pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude \\\n", "0 -73.965919 40.771244 -73.949608 40.777058 \n", "1 -73.997482 40.725952 -74.005936 40.735703 \n", "2 -73.964798 40.767391 -73.977753 40.773746 \n", "3 -74.011597 40.708832 -74.013466 40.709358 \n", "4 -74.000648 40.718578 -73.944580 40.712368 \n", "\n", " total_amount \n", "0 5.800000 \n", "1 5.400000 \n", "2 5.800000 \n", "3 4.600000 \n", "4 27.799999 " ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def convert_lon(d, latvar):\n", " \"Convert longitude to web mercator\"\n", " k = d[latvar].copy()\n", " k = (20037508.34 / 180) * (np.log(np.tan((90. + d[latvar]) * np.pi/360))/(np.pi/180.))\n", " return k" ] }, { "cell_type": "code", "execution_count": 28, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Convert lats and lons to web mercator projection\n", "df['pickup_longitude'] = df.pickup_longitude * (20037508.34 / 180)\n", "df['pickup_latitude'] = df.map_partitions(convert_lon, 'pickup_latitude')\n", "df['dropoff_longitude'] = df.dropoff_longitude * (20037508.34 / 180)\n", "df['dropoff_latitude'] = df.map_partitions(convert_lon, 'dropoff_latitude')" ] }, { "cell_type": "code", "execution_count": 29, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Consolidate partitions for faster plotting\n", "df.repartition(npartitions=200).to_parquet('/tmp/filtered.parquet', compression='SNAPPY')" ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Read the consolidated data back in\n", "df = dd.read_parquet('/tmp/filtered.parquet')" ] }, { "cell_type": "code", "execution_count": 31, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Subsample the data \n", "# It's currently commented out, but it's useful \n", "# when iterating on plot details (axes, ranges, etc.), \n", "# as it greatly speeds up plot redrawing. \n", "\n", "# df = client.persist(df.sample(frac=0.02))" ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
pickup_longitudepickup_latitudedropoff_longitudedropoff_latitudetotal_amount
0-8233848.54978660.0-8232033.04979513.05.800000
1-8237362.04972004.0-8238303.04973436.55.400000
2-8233724.04978093.5-8235166.04979025.55.800000
3-8238933.54969490.0-8239141.54969566.04.600000
4-8237714.54970922.5-8231473.04970009.027.799999
\n", "
" ], "text/plain": [ " pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude \\\n", "0 -8233848.5 4978660.0 -8232033.0 4979513.0 \n", "1 -8237362.0 4972004.0 -8238303.0 4973436.5 \n", "2 -8233724.0 4978093.5 -8235166.0 4979025.5 \n", "3 -8238933.5 4969490.0 -8239141.5 4969566.0 \n", "4 -8237714.5 4970922.5 -8231473.0 4970009.0 \n", "\n", " total_amount \n", "0 5.800000 \n", "1 5.400000 \n", "2 5.800000 \n", "3 4.600000 \n", "4 27.799999 " ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": 61, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import datashader as ds\n", "import datashader.transfer_functions as tf\n", "\n", "import datashader as ds\n", "from datashader.bokeh_ext import InteractiveImage\n", "from functools import partial\n", "from datashader.utils import export_image\n", "from datashader.colors import colormap_select, Greys9, Hot, viridis, inferno\n", "from IPython.core.display import HTML, display" ] }, { "cell_type": "code", "execution_count": 62, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", " \n", " Loading BokehJS ...\n", "
" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/javascript": [ "\n", "(function(global) {\n", " function now() {\n", " return new Date();\n", " }\n", "\n", " var force = true;\n", "\n", " if (typeof (window._bokeh_onload_callbacks) === \"undefined\" || force === true) {\n", " window._bokeh_onload_callbacks = [];\n", " window._bokeh_is_loading = undefined;\n", " }\n", "\n", "\n", " \n", " if (typeof (window._bokeh_timeout) === \"undefined\" || force === true) {\n", " window._bokeh_timeout = Date.now() + 5000;\n", " window._bokeh_failed_load = false;\n", " }\n", "\n", " var NB_LOAD_WARNING = {'data': {'text/html':\n", " \"
\\n\"+\n", " \"

\\n\"+\n", " \"BokehJS does not appear to have successfully loaded. If loading BokehJS from CDN, this \\n\"+\n", " \"may be due to a slow or bad network connection. Possible fixes:\\n\"+\n", " \"

\\n\"+\n", " \"\\n\"+\n", " \"\\n\"+\n", " \"from bokeh.resources import INLINE\\n\"+\n", " \"output_notebook(resources=INLINE)\\n\"+\n", " \"\\n\"+\n", " \"
\"}};\n", "\n", " function display_loaded() {\n", " if (window.Bokeh !== undefined) {\n", " document.getElementById(\"8d091618-c77e-4686-a40c-8a673de9b2a6\").textContent = \"BokehJS successfully loaded.\";\n", " } else if (Date.now() < window._bokeh_timeout) {\n", " setTimeout(display_loaded, 100)\n", " }\n", " }\n", "\n", " function run_callbacks() {\n", " window._bokeh_onload_callbacks.forEach(function(callback) { callback() });\n", " delete window._bokeh_onload_callbacks\n", " console.info(\"Bokeh: all callbacks have finished\");\n", " }\n", "\n", " function load_libs(js_urls, callback) {\n", " window._bokeh_onload_callbacks.push(callback);\n", " if (window._bokeh_is_loading > 0) {\n", " console.log(\"Bokeh: BokehJS is being loaded, scheduling callback at\", now());\n", " return null;\n", " }\n", " if (js_urls == null || js_urls.length === 0) {\n", " run_callbacks();\n", " return null;\n", " }\n", " console.log(\"Bokeh: BokehJS not loaded, scheduling load and callback at\", now());\n", " window._bokeh_is_loading = js_urls.length;\n", " for (var i = 0; i < js_urls.length; i++) {\n", " var url = js_urls[i];\n", " var s = document.createElement('script');\n", " s.src = url;\n", " s.async = false;\n", " s.onreadystatechange = s.onload = function() {\n", " window._bokeh_is_loading--;\n", " if (window._bokeh_is_loading === 0) {\n", " console.log(\"Bokeh: all BokehJS libraries loaded\");\n", " run_callbacks()\n", " }\n", " };\n", " s.onerror = function() {\n", " console.warn(\"failed to load library \" + url);\n", " };\n", " console.log(\"Bokeh: injecting script tag for BokehJS library: \", url);\n", " document.getElementsByTagName(\"head\")[0].appendChild(s);\n", " }\n", " };var element = document.getElementById(\"8d091618-c77e-4686-a40c-8a673de9b2a6\");\n", " if (element == null) {\n", " console.log(\"Bokeh: ERROR: autoload.js configured with elementid '8d091618-c77e-4686-a40c-8a673de9b2a6' but no matching script tag was found. \")\n", " return false;\n", " }\n", "\n", " var js_urls = [\"https://cdn.pydata.org/bokeh/release/bokeh-0.12.4.min.js\", \"https://cdn.pydata.org/bokeh/release/bokeh-widgets-0.12.4.min.js\"];\n", "\n", " var inline_js = [\n", " function(Bokeh) {\n", " Bokeh.set_log_level(\"info\");\n", " },\n", " \n", " function(Bokeh) {\n", " \n", " document.getElementById(\"8d091618-c77e-4686-a40c-8a673de9b2a6\").textContent = \"BokehJS is loading...\";\n", " },\n", " function(Bokeh) {\n", " console.log(\"Bokeh: injecting CSS: https://cdn.pydata.org/bokeh/release/bokeh-0.12.4.min.css\");\n", " Bokeh.embed.inject_css(\"https://cdn.pydata.org/bokeh/release/bokeh-0.12.4.min.css\");\n", " console.log(\"Bokeh: injecting CSS: https://cdn.pydata.org/bokeh/release/bokeh-widgets-0.12.4.min.css\");\n", " Bokeh.embed.inject_css(\"https://cdn.pydata.org/bokeh/release/bokeh-widgets-0.12.4.min.css\");\n", " }\n", " ];\n", "\n", " function run_inline_js() {\n", " \n", " if ((window.Bokeh !== undefined) || (force === true)) {\n", " for (var i = 0; i < inline_js.length; i++) {\n", " inline_js[i](window.Bokeh);\n", " }if (force === true) {\n", " display_loaded();\n", " }} else if (Date.now() < window._bokeh_timeout) {\n", " setTimeout(run_inline_js, 100);\n", " } else if (!window._bokeh_failed_load) {\n", " console.log(\"Bokeh: BokehJS failed to load within specified timeout.\");\n", " window._bokeh_failed_load = true;\n", " } else if (force !== true) {\n", " var cell = $(document.getElementById(\"8d091618-c77e-4686-a40c-8a673de9b2a6\")).parents('.cell').data().cell;\n", " cell.output_area.append_execute_result(NB_LOAD_WARNING)\n", " }\n", "\n", " }\n", "\n", " if (window._bokeh_is_loading === 0) {\n", " console.log(\"Bokeh: BokehJS loaded, going straight to plotting\");\n", " run_inline_js();\n", " } else {\n", " load_libs(js_urls, function() {\n", " console.log(\"Bokeh: BokehJS plotting callback run at\", now());\n", " run_inline_js();\n", " });\n", " }\n", "}(this));" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from bokeh.models import BoxZoomTool\n", "from bokeh.plotting import figure, output_notebook, show\n", "\n", "output_notebook()\n", "\n", "#set centers, bounds, and ranges in web mercator coords\n", "x_center = -8234000 \n", "y_center = 4973000\n", "\n", "x_half_range = 30000\n", "y_half_range = 25000\n", "\n", "NYC = x_range, y_range = ((x_center - x_half_range, x_center + x_half_range), \n", " (y_center-y_half_range, y_center+y_half_range))\n", "\n", "# plot_width scales (quadratically?) with memory consumption.\n", "# With 32GB RAM, I can set this to 2000, but 2500 crashes with MemoryError.\n", "# I used this setting for high quality, large plots. \n", "# plot_width = 2000 \n", "\n", "# plot_width of 400 seems to require less than 4GB, and makes the notebook more manageable. \n", "# Also changes aesthetic appearance by decreasing GPS \"noise\" due to coarse binning\n", "plot_width = 400 \n", "\n", "# auto calculate from width\n", "plot_height = int(plot_width/(x_half_range/y_half_range))\n", "\n", "def base_plot(tools='pan,wheel_zoom,reset,save',plot_width=plot_width, \n", " plot_height=plot_height, **plot_args):\n", " p = figure(tools=tools, plot_width=plot_width, plot_height=plot_height,\n", " x_range=x_range, y_range=y_range, outline_line_color=None,\n", " min_border=0, min_border_left=0, min_border_right=0,\n", " min_border_top=0, min_border_bottom=0, **plot_args)\n", " \n", " p.axis.visible = False\n", " p.xgrid.grid_line_color = None\n", " p.ygrid.grid_line_color = None\n", " \n", " p.add_tools(BoxZoomTool(match_aspect=True))\n", " \n", " return p\n", " \n", "options = dict(line_color=None, fill_color='blue', size=5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pickups" ] }, { "cell_type": "code", "execution_count": 40, "metadata": { "collapsed": true }, "outputs": [], "source": [ "background = \"black\"\n", "export = partial(export_image, export_path=\"export\", background=background)\n", "cm = partial(colormap_select, reverse=(background==\"black\"))\n", "\n", "\n", "def create_image_1(x_range, y_range, w=plot_width, h=plot_height):\n", " cvs = ds.Canvas(plot_width=w, plot_height=h, x_range=x_range, y_range=y_range)\n", " agg = cvs.points(df, 'pickup_longitude', 'pickup_latitude', ds.count('total_amount'))\n", " img = tf.shade(agg, cmap=viridis, how='eq_hist')\n", " return tf.dynspread(img, threshold=0.5, max_px=4)" ] }, { "cell_type": "code", "execution_count": 41, "metadata": { "collapsed": false, "scrolled": false }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", "
\n", "
\n", "" ], "text/plain": [ "" ] }, "execution_count": 41, "metadata": {}, "output_type": "execute_result" } ], "source": [ "p = base_plot(background_fill_color=background)\n", "export(create_image_1(x_range, y_range, plot_width, plot_height),\"pickups_large_wide\")\n", "InteractiveImage(p, create_image_1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dropoffs" ] }, { "cell_type": "code", "execution_count": 42, "metadata": { "collapsed": true }, "outputs": [], "source": [ "background = \"black\"\n", "export = partial(export_image, export_path=\"export\", background=background)\n", "cm = partial(colormap_select, reverse=(background==\"black\"))\n", "\n", "\n", "def create_image_2(x_range, y_range, w=plot_width, h=plot_height):\n", " cvs = ds.Canvas(plot_width=w, plot_height=h, x_range=x_range, y_range=y_range)\n", " agg = cvs.points(df, 'dropoff_longitude', 'dropoff_latitude', ds.count('total_amount'))\n", " img = tf.shade(agg, cmap=inferno, how='eq_hist')\n", " return tf.dynspread(img, threshold=0.5, max_px=4)" ] }, { "cell_type": "code", "execution_count": 43, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", "
\n", "
\n", "