{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Analyzing flight data with `dask.dataframe`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook we do a brief demo of using `dask.dataframe` on the airline flight data from [here](http://stat-computing.org/dataexpo/2009/the-data.html). This contains data on 22 years of all flights inside the USA, including information such as origin and destination, delays, and cancellations. The data has been extracted and decompressed into a folder of csv files, totalling around 11 GB, and 121 million rows." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "121M\tairline_data/1987.csv\n", "478M\tairline_data/1988.csv\n", "464M\tairline_data/1989.csv\n", "486M\tairline_data/1990.csv\n", "468M\tairline_data/1991.csv\n", "470M\tairline_data/1992.csv\n", "468M\tairline_data/1993.csv\n", "478M\tairline_data/1994.csv\n", "506M\tairline_data/1995.csv\n", "509M\tairline_data/1996.csv\n", "515M\tairline_data/1997.csv\n", "513M\tairline_data/1998.csv\n", "527M\tairline_data/1999.csv\n", "544M\tairline_data/2000.csv\n", "573M\tairline_data/2001.csv\n", "506M\tairline_data/2002.csv\n", "598M\tairline_data/2003.csv\n", "639M\tairline_data/2004.csv\n", "640M\tairline_data/2005.csv\n", "641M\tairline_data/2006.csv\n", "670M\tairline_data/2007.csv\n", "657M\tairline_data/2008.csv\n", " 11G\ttotal\n" ] } ], "source": [ "%%bash\n", "du -hc airline_data/*.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Open with dask.dataframe" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can load all the files into a `dask.dataframe` using a clone of the `read_csv` method from pandas. \n", "\n", "Note that dask determines the datatype of each column by reading the first 1000 rows of the first file. If a column has integers for 1000 rows, and then a `NaN`, it will error as the dtype was improperly guessed. As such, we need to pass in some specified datatypes for a few columns. We'll also rename the columns from the original `CamelCase` to `snake_case`, as the `snake_case` is more pythonic.\n", "\n", "``dask.dataframe.read_csv`` can take a glob of filenames, we'll only glob the first 3 years for now (the reason will become apparent later)." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import dask.dataframe as dd" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "cols = ['year', 'month', 'day_of_month', 'day_of_week', 'deptime', 'crs_deptime', 'arrtime', \n", " 'crs_arrtime', 'unique_carrier', 'flight_num', 'tail_num', 'actual_elapsed_time',\n", " 'crs_elapsed_time', 'air_time', 'arrdelay', 'depdelay', 'origin', 'dest', 'distance', \n", " 'taxi_in', 'taxi_out', 'cancelled', 'cancellation_code', 'diverted', 'carrier_delay',\n", " 'weather_delay', 'nas_delay', 'security_delay', 'late_aircraft_delay']\n", "\n", "dtypes = {'cancellation_code': object, 'taxi_in': float, 'taxi_out': float, 'cancelled': bool,\n", " 'diverted': bool, 'carrier_delay': float, 'weather_delay': float, 'nas_delay': float,\n", " 'security_delay': float, 'late_aircraft_delay': float, 'tail_num': object,\n", " 'crs_deptime': float, 'crs_arrtime': float, 'flight_num': float, 'crs_elapsed_time': float,\n", " 'distance': float}\n", "\n", "df = dd.read_csv('airline_data/198*.csv', header=0, names=cols, dtype=dtypes)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now have a `dask.dataframe.DataFrame` object. This looks a lot like a pandas `DataFrame`, and has many of the same methods:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
yearmonthday_of_monthday_of_weekdeptimecrs_deptimearrtimecrs_arrtimeunique_carrierflight_num...taxi_intaxi_outcancelledcancellation_codedivertedcarrier_delayweather_delaynas_delaysecurity_delaylate_aircraft_delay
0198710143741730912849PS1451...NaNNaNFalseNaNFalseNaNNaNNaNNaNNaN
1198710154729730903849PS1451...NaNNaNFalseNaNFalseNaNNaNNaNNaNNaN
2198710176741730918849PS1451...NaNNaNFalseNaNFalseNaNNaNNaNNaNNaN
3198710187729730847849PS1451...NaNNaNFalseNaNFalseNaNNaNNaNNaNNaN
4198710191749730922849PS1451...NaNNaNFalseNaNFalseNaNNaNNaNNaNNaN
\n", "

5 rows × 29 columns

\n", "
" ], "text/plain": [ " year month day_of_month day_of_week deptime crs_deptime arrtime \\\n", "0 1987 10 14 3 741 730 912 \n", "1 1987 10 15 4 729 730 903 \n", "2 1987 10 17 6 741 730 918 \n", "3 1987 10 18 7 729 730 847 \n", "4 1987 10 19 1 749 730 922 \n", "\n", " crs_arrtime unique_carrier flight_num ... taxi_in \\\n", "0 849 PS 1451 ... NaN \n", "1 849 PS 1451 ... NaN \n", "2 849 PS 1451 ... NaN \n", "3 849 PS 1451 ... NaN \n", "4 849 PS 1451 ... NaN \n", "\n", " taxi_out cancelled cancellation_code diverted carrier_delay \\\n", "0 NaN False NaN False NaN \n", "1 NaN False NaN False NaN \n", "2 NaN False NaN False NaN \n", "3 NaN False NaN False NaN \n", "4 NaN False NaN False NaN \n", "\n", " weather_delay nas_delay security_delay late_aircraft_delay \n", "0 NaN NaN NaN NaN \n", "1 NaN NaN NaN NaN \n", "2 NaN NaN NaN NaN \n", "3 NaN NaN NaN NaN \n", "4 NaN NaN NaN NaN \n", "\n", "[5 rows x 29 columns]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "year int64\n", "month int64\n", "day_of_month int64\n", "day_of_week int64\n", "deptime float64\n", "crs_deptime float64\n", "arrtime float64\n", "crs_arrtime float64\n", "unique_carrier object\n", "flight_num float64\n", "tail_num object\n", "actual_elapsed_time float64\n", "crs_elapsed_time float64\n", "air_time float64\n", "arrdelay float64\n", "depdelay float64\n", "origin object\n", "dest object\n", "distance float64\n", "taxi_in float64\n", "taxi_out float64\n", "cancelled bool\n", "cancellation_code object\n", "diverted bool\n", "carrier_delay float64\n", "weather_delay float64\n", "nas_delay float64\n", "security_delay float64\n", "late_aircraft_delay float64\n", "dtype: object" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.dtypes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dask `DataFrame` is partitioned into chunks along the index. To see how many partitions, you can use the `npartitions` attribute." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "34" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.npartitions" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "Many of the methods available in pandas are also available in dask. The difference is, expressions don't evaluate immediately, but instead build up a graph of the computation to be executed later. To finally run the computation, you need to call the `compute` method.\n", "\n", "Lets just create some expressions for now:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Maximum departure delay over the 3 years\n", "expr = df.depdelay.max()\n", "expr" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Maximum departure delay for flights leaving 'EWR'\n", "expr = df.depdelay[df.origin == 'EWR'].max()\n", "expr" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "dd.Series" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Mean departure delay for each airport\n", "expr = df.depdelay.groupby(df.origin).mean()\n", "expr" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "dd.Series" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Top 10 airports by mean departure delay\n", "expr = df.depdelay.groupby(df.origin).mean().nlargest(10)\n", "expr" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "None of these expressions are actually computed yet - instead they have built up a graph expressing the computation. When the `compute` method is called, this graph will be executed by a scheduler in parallel (using threads by default). Because some operations in pandas release the [Global Interpreter Lock](https://wiki.python.org/moin/GlobalInterpreterLock), we can achieve some level of parallelism. The schedulers also try to minimize memory use, by only loading data as it's needed. This allows us to work on data that is larger than the RAM available on your computer." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parsing csvs is slow..." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The data is currently stored as csvs, which can be slow to parse. We can profile exactly how slow using the tools available in the [diagnostics](http://dask.pydata.org/en/latest/diagnostics.html) submodule. We'll also register a progress bar, so we can monitor our computations as they run." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ " \n", "\n", "\n", " \n", "\n", "
\n", " \n", " BokehJS successfully loaded.\n", "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler, visualize\n", "import bokeh.plotting as bp\n", "ProgressBar().register()\n", "bp.output_notebook()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we'll run the last expression we created above - finding the top 10 airports by mean departure delay from 1987-1989. We'll use both the `Profiler` (for profiling task timings) and the `ResourceProfiler` (for profiling CPU and memory usage)." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[########################################] | 100% Completed | 35.4s\n" ] } ], "source": [ "with Profiler() as prof, ResourceProfiler() as rprof:\n", " res = expr.compute()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "origin\n", "PIR 85.000000\n", "ABI 36.500000\n", "EGE 32.363636\n", "BFI 28.000000\n", "ROP 24.700000\n", "ACV 19.740223\n", "SUN 19.000000\n", "YAP 16.902482\n", "RDD 15.724234\n", "HDN 14.147350\n", "dtype: float64" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "res" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This took ~35 seconds for only 3 years worth. If we were running on the entire 22 year dataset, this would be far to slow for interactive work. To determine what's causing the slowdown, we can plot the task timings and resource usage together using the `visualize` command." ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": false, "scrolled": false }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "visualize([prof, rprof])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The top plot shows what task each worker (threads in this case) is executing over time. The tasks are color coded by function, and can be inspected by hovering over the rectangle with your mouse. As can be seen here, much of the time is spent in calls to `_read_csv`, which is a thin wrapper around `pandas.read_csv`. Only a small amount of time is spent on the actual computation (calls to `groupby`, `getitem`, etc...). \n", "\n", "The bottom plot shows CPU and memory usage over time. From this we can see that the calls to `_read_csv` aren't parallelizing well - indicating that `pandas.read_csv` holds the GIL. We're not getting much parallelism here, with spikes over 100% occuring only in the compute heavy portitions (`groupby`, etc...).\n", "\n", "These plots combined indicate that the storage format is our limiting factor here. Parsing csvs is slow, and also holds the GIL, so it doesn't parallelize. To improve performance, we'll have to change to another format." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Repeat with Castra" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[Castra](https://github.com/blaze/castra) is a partitioned column store based on the [blosc](http://blosc.org/) compressor. It is designed to play well with `dask.dataframe`, improving load time and reducing IO costs. It is an experimental codebase, and not ready for production use. Here it serves to demonstrate the importance of storage format in computation speed.\n", "\n", "I created a castra from the airline data using code found [here](https://gist.github.com/jcrist/b5bfbf3be5ca8cf0c20d). In the process I also added an index along the departure date (converting the `'year'`, `'month'`, and `'day_of_month'` columns into a single `Timestamp`). The `'dest'`, `'origin'`, `'cancellation_code'`, and `'unique_carrier'` columns were also stored as [pandas categoricals](http://matthewrocklin.com/blog/work/2015/06/18/Categoricals/), as they are inherently categorical.\n", "\n", "To load the data from the castra into a dask `DataFrame`, we use the `from_castra` method." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": true }, "outputs": [], "source": [ "df = dd.from_castra('airport.castra/')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now have a `DataFrame` that holds all 22 years of the airline data. The only difference between it and the dataframe above is how the data will be loaded, but as we shall see below this makes a big difference in performance. \n", "\n", "Here we run the same computation again, except loading the data a castra instead of from csvs. Note that this is for all 22 years, instead of the 3 years we did earlier." ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[########################################] | 100% Completed | 47.8s\n" ] } ], "source": [ "with Profiler() as prof, ResourceProfiler() as rprof:\n", " top_avg_depdelay = df.depdelay.groupby(df.origin).mean().nlargest(10).compute()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "origin\n", "FMN 203.666667\n", "CYS 145.000000\n", "OGD 143.666667\n", "BFF 131.000000\n", "PIR 38.100000\n", "BFI 28.000000\n", "ACK 27.383212\n", "CKB 25.625000\n", "SOP 25.241042\n", "ADK 22.731978\n", "dtype: float64" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "top_avg_depdelay" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "visualize([prof, rprof])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Looking at the profile plot above, we can see a few big differences between it and the one for the csv files. The computation is no longer dominated by the tasks loading the data, and instead is roughly 50-50 loading and computation steps. Further, the CPU load is consistently greater than 100%, averaging around 125%. This shows that we're achieving some level of parallelism throughout. The execution time was also severely reduced - we processed 22 years of data in ~45 seconds. This is roughly 6 times faster than loading from csvs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cancelled flights by day" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With the data stored in a more accesible format, it becomes easier to do some interactive exploration of the dataset. Here we'll compute the number of cancelled flights each day over all 22 years. This is composed of two operations:\n", "- Selecting only the flights that are cancelled\n", "- Resampling over this series by day, counting all flights in each bin" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[########################################] | 100% Completed | 48.6s\n" ] } ], "source": [ "cancelled_by_day = df.cancelled[df.cancelled == True].resample('d', how='count').compute()" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "fig = bp.figure(x_axis_type='datetime', title='Cancelled flights by day', \n", " plot_height=300, plot_width=600)\n", "fig.line(x=cancelled_by_day.index, y=cancelled_by_day.values)\n", "fig.title_text_font_size = '16pt'\n", "fig.xaxis.axis_label = 'Date'\n", "fig.xaxis.axis_label_text_font_size = '12pt'\n", "fig.yaxis.axis_label = 'Cancelled Flights'\n", "fig.yaxis.axis_label_text_font_size = '12pt'\n", "bp.show(fig)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you zoom in you can see that the huge spike is 9/11/2001." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Shared sub-expressions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sometimes you may want to compute several outputs that may share sub-expressions. Instead of calling the `compute` method on each output, you can use the `dask.dataframe.compute` function, which computes multiple results, sharing intermediates. This improves efficiency, as the intermediate computations then only need to be done once.\n", "\n", "Here we'll select all flights out of EWR, and then compute the daily and weekly mean departure delays for these flights. By using the `compute` function, we only have to compute the selection of flights from EWR (`from_ewr` variable) once." ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[########################################] | 100% Completed | 42.4s\n" ] } ], "source": [ "from_ewr = df.depdelay[df.origin=='EWR']\n", "by_week = from_ewr.resample('W', how='mean')\n", "by_month = from_ewr.resample('M', how='mean')\n", "by_week, by_month = dd.compute(by_week, by_month)" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "fig = bp.figure(x_axis_type='datetime', title='Average flight delay out of EWR', \n", " plot_height=300, plot_width=600)\n", "fig.line(x=by_week.index, y=by_week.values, color='blue', alpha=0.3, legend='by week')\n", "fig.line(x=by_month.index, y=by_month.values, color='red', legend='by month')\n", "fig.title_text_font_size = '16pt'\n", "fig.xaxis.axis_label = 'Date'\n", "fig.xaxis.axis_label_text_font_size = '12pt'\n", "fig.yaxis.axis_label = 'Departure Delay (min)'\n", "fig.yaxis.axis_label_text_font_size = '12pt'\n", "bp.show(fig)" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "## Learning More" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is only a short overview of the features that `dask.dataframe` implements. To learn more:\n", "\n", "- See the dask documentation [here](http://dask.pydata.org/en/latest/)\n", "- Try out some examples online using [binder](http://mybinder.org/) [here](http://mybinder.org/repo/blaze/dask-examples)\n", "- Watch the [tutorial from PyData Seattle](https://www.youtube.com/watch?v=ieW3G7ZzRZ0), or one of the [many](https://www.youtube.com/watch?v=1kkFZ4P-XHg) [talks](https://www.youtube.com/watch?v=yDlCNjtZvLw) [given](https://www.youtube.com/watch?v=HLME2WKTJJ8) on dask at various conferences.\n", "- Follow our blog at [blaze.pydata.org](http://blaze.pydata.org/)\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.10" } }, "nbformat": 4, "nbformat_minor": 0 }