{ "cells": [ { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import dask.dataframe as dd\n", "\n", "import matplotlib.pyplot as plt\n", "import seaborn as sns" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "pd.options.display.max_rows = 10" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rw-r--r-- 1 taugspurger staff 2.4G Sep 9 06:14 data/yellow_tripdata_2009-01.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.2G Sep 9 10:56 data/yellow_tripdata_2009-02.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:03 data/yellow_tripdata_2009-03.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:10 data/yellow_tripdata_2009-04.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.5G Sep 9 11:17 data/yellow_tripdata_2009-05.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:23 data/yellow_tripdata_2009-06.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.3G Sep 9 11:30 data/yellow_tripdata_2009-07.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.3G Sep 9 11:36 data/yellow_tripdata_2009-08.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:44 data/yellow_tripdata_2009-09.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.6G Sep 9 11:52 data/yellow_tripdata_2009-10.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:59 data/yellow_tripdata_2009-11.csv\r\n", "-rw-r--r-- 1 taugspurger staff 2.5G Sep 9 12:07 data/yellow_tripdata_2009-12.csv\r\n" ] } ], "source": [ "ls -lh data/*.csv" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1min 4s, sys: 5.2 s, total: 1min 9s\n", "Wall time: 1min 9s\n" ] } ], "source": [ "%%time\n", "dtype = {\n", " 'vendor_name': 'category',\n", " 'Payment_Type': 'category',\n", "}\n", "\n", "df = pd.read_csv(\"data/yellow_tripdata_2009-01.csv\", dtype=dtype,\n", " parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
vendor_nameTrip_Pickup_DateTimeTrip_Dropoff_DateTimePassenger_CountTrip_DistanceStart_LonStart_LatRate_Codestore_and_forwardEnd_LonEnd_LatPayment_TypeFare_Amtsurchargemta_taxTip_AmtTolls_AmtTotal_Amt
0VTS2009-01-04 02:52:002009-01-04 03:02:0012.63-73.99195740.721567NaNNaN-73.99380340.695922CASH8.90.5NaN0.000.09.40
1VTS2009-01-04 03:31:002009-01-04 03:38:0034.55-73.98210240.736290NaNNaN-73.95585040.768030Credit12.10.5NaN2.000.014.60
2VTS2009-01-03 15:43:002009-01-03 15:57:00510.35-74.00258740.739748NaNNaN-73.86998340.770225Credit23.70.0NaN4.740.028.44
3DDS2009-01-01 20:52:582009-01-01 21:14:0015.00-73.97426740.790955NaNNaN-73.99655840.731849CREDIT14.90.5NaN3.050.018.45
4DDS2009-01-24 16:18:232009-01-24 16:24:5610.40-74.00158040.719382NaNNaN-74.00837840.720350CASH3.70.0NaN0.000.03.70
\n", "
" ], "text/plain": [ " vendor_name Trip_Pickup_DateTime Trip_Dropoff_DateTime Passenger_Count \\\n", "0 VTS 2009-01-04 02:52:00 2009-01-04 03:02:00 1 \n", "1 VTS 2009-01-04 03:31:00 2009-01-04 03:38:00 3 \n", "2 VTS 2009-01-03 15:43:00 2009-01-03 15:57:00 5 \n", "3 DDS 2009-01-01 20:52:58 2009-01-01 21:14:00 1 \n", "4 DDS 2009-01-24 16:18:23 2009-01-24 16:24:56 1 \n", "\n", " Trip_Distance Start_Lon Start_Lat Rate_Code store_and_forward \\\n", "0 2.63 -73.991957 40.721567 NaN NaN \n", "1 4.55 -73.982102 40.736290 NaN NaN \n", "2 10.35 -74.002587 40.739748 NaN NaN \n", "3 5.00 -73.974267 40.790955 NaN NaN \n", "4 0.40 -74.001580 40.719382 NaN NaN \n", "\n", " End_Lon End_Lat Payment_Type Fare_Amt surcharge mta_tax Tip_Amt \\\n", "0 -73.993803 40.695922 CASH 8.9 0.5 NaN 0.00 \n", "1 -73.955850 40.768030 Credit 12.1 0.5 NaN 2.00 \n", "2 -73.869983 40.770225 Credit 23.7 0.0 NaN 4.74 \n", "3 -73.996558 40.731849 CREDIT 14.9 0.5 NaN 3.05 \n", "4 -74.008378 40.720350 CASH 3.7 0.0 NaN 0.00 \n", "\n", " Tolls_Amt Total_Amt \n", "0 0.0 9.40 \n", "1 0.0 14.60 \n", "2 0.0 28.44 \n", "3 0.0 18.45 \n", "4 0.0 3.70 " ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's predict whether or not the person tips. We'll keep it simple and just use a `LogisticRegression`." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": true }, "outputs": [], "source": [ "X = df.drop(\"Tip_Amt\", axis=1)\n", "y = df['Tip_Amt'] > 0" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "from sklearn.model_selection import train_test_split" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": true }, "outputs": [], "source": [ "X_train, X_test, y_train, y_test = train_test_split(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "I notice that there are some minor differences in the spelling on \"Payment Type\":" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Index(['CASH', 'CREDIT', 'Cash', 'Credit', 'Dispute', 'No Charge'], dtype='object')" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.Payment_Type.cat.categories" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll consolidate those by just lower-casing them:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0 cash\n", "1 credit\n", "2 credit\n", "3 credit\n", "4 cash\n", " ... \n", "14092408 cash\n", "14092409 credit\n", "14092410 cash\n", "14092411 cash\n", "14092412 credit\n", "Name: Payment_Type, Length: 14092413, dtype: object" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.Payment_Type.str.lower()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we'll want to ensure that all the DataFrames have a consistent set of categories (incidentally, this is what my [`CategoricalDtype` refactor](https://github.com/pandas-dev/pandas/pull/16015) is going solve a little more cleanly that what we'll have to do here). For now, we'll just use a `set_categories`.\n", "\n", "And since we're good sci-kittens, we'll package all this up in a pipeline." ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from sklearn.base import BaseEstimator, TransformerMixin\n", "from sklearn.pipeline import make_pipeline\n", "from sklearn.preprocessing import FunctionTransformer" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "class ColumnSelector(TransformerMixin):\n", " \"Select `columns` from `X`\"\n", " def __init__(self, columns):\n", " self.columns = columns\n", "\n", " def fit(self, X, y=None):\n", " return self\n", "\n", " def transform(self, X, y=None):\n", " return X[self.columns]\n", " \n", "\n", "class HourExtractor(TransformerMixin):\n", " \"Transform each datetime64 column in `columns` to integer hours\"\n", " def __init__(self, columns):\n", " self.columns = columns\n", "\n", " def fit(self, X, y=None):\n", " return self\n", "\n", " def transform(self, X, y=None):\n", " return X.assign(**{col: lambda x: x[col].dt.hour for col in self.columns})\n", "\n", "\n", "def payment_lowerer(X):\n", " \"\"\"Lowercase all the Payment_Type values\"\"\"\n", " return X.assign(Payment_Type=X.Payment_Type.str.lower())\n", "\n", "\n", "class CategoricalEncoder(TransformerMixin):\n", " \"\"\"Convert to Categorical with specific `categories`\"\"\"\n", " def __init__(self, categories):\n", " self.categories = categories\n", " \n", " def fit(self, X, y=None):\n", " return self\n", " \n", " def transform(self, X, y=None):\n", " for col, categories in self.categories.items():\n", " X[col] = X[col].astype('category').cat.set_categories(categories)\n", " return X\n", " \n", "class StandardScaler(TransformerMixin):\n", " \"Scale a subset of the columns in a DataFrame\"\n", " def __init__(self, columns):\n", " self.columns = columns\n", " \n", " def fit(self, X, y=None):\n", " self.μs = X[self.columns].mean()\n", " self.σs = X[self.columns].std()\n", " return self\n", "\n", " def transform(self, X, y=None):\n", " X = X.copy()\n", " X[self.columns] = X[self.columns].sub(self.μs).div(self.σs)\n", " return X" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Pipeline(memory=None,\n", " steps=[('columnselector', <__main__.ColumnSelector object at 0x1561b2dd8>), ('hourextractor', <__main__.HourExtractor object at 0x1561b2278>), ('functiontransformer-1', FunctionTransformer(accept_sparse=False,\n", " func=, inv_kw_args=None,\n", " inve...ty='l2', random_state=None, solver='liblinear', tol=0.0001,\n", " verbose=0, warm_start=False))])" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# The columns at the start of the pipeline\n", "columns = ['vendor_name', 'Trip_Pickup_DateTime',\n", " 'Passenger_Count', 'Trip_Distance',\n", " 'Payment_Type', 'Fare_Amt', 'surcharge']\n", "\n", "# The mapping of {column: set of categories}\n", "categories = {\n", " 'vendor_name': ['CMT', 'DDS', 'VTS'],\n", " 'Payment_Type': ['cash', 'credit', 'dispute', 'no charge'],\n", "}\n", "\n", "scale = ['Trip_Distance', 'Fare_Amt', 'surcharge']\n", "\n", "pipe = make_pipeline(\n", " ColumnSelector(columns),\n", " HourExtractor(['Trip_Pickup_DateTime']),\n", " FunctionTransformer(payment_lowerer, validate=False),\n", " CategoricalEncoder(categories),\n", " FunctionTransformer(pd.get_dummies, validate=False),\n", " StandardScaler(scale),\n", " LogisticRegression(),\n", ")\n", "pipe" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('columnselector', <__main__.ColumnSelector at 0x1561b2dd8>),\n", " ('hourextractor', <__main__.HourExtractor at 0x1561b2278>),\n", " ('functiontransformer-1', FunctionTransformer(accept_sparse=False,\n", " func=, inv_kw_args=None,\n", " inverse_func=None, kw_args=None, pass_y='deprecated',\n", " validate=False)),\n", " ('categoricalencoder', <__main__.CategoricalEncoder at 0x1561b2668>),\n", " ('functiontransformer-2', FunctionTransformer(accept_sparse=False,\n", " func=, inv_kw_args=None,\n", " inverse_func=None, kw_args=None, pass_y='deprecated',\n", " validate=False)),\n", " ('standardscaler', <__main__.StandardScaler at 0x1561b2198>),\n", " ('logisticregression',\n", " LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,\n", " intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,\n", " penalty='l2', random_state=None, solver='liblinear', tol=0.0001,\n", " verbose=0, warm_start=False))]" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pipe.steps" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This may seem a bit overly-formal. You *could* just do this stuff imperatively." ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 59.6 s, sys: 5.81 s, total: 1min 5s\n", "Wall time: 1min 7s\n" ] }, { "data": { "text/plain": [ "Pipeline(memory=None,\n", " steps=[('columnselector', <__main__.ColumnSelector object at 0x1561b2dd8>), ('hourextractor', <__main__.HourExtractor object at 0x1561b2278>), ('functiontransformer-1', FunctionTransformer(accept_sparse=False,\n", " func=, inv_kw_args=None,\n", " inve...ty='l2', random_state=None, solver='liblinear', tol=0.0001,\n", " verbose=0, warm_start=False))])" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time pipe.fit(X_train, y_train)" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0.99314373342666018" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pipe.score(X_train, y_train)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0.99316284730737436" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pipe.score(X_test, y_test)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def mkpipe():\n", " pipe = make_pipeline(\n", " ColumnSelector(columns),\n", " HourExtractor(['Trip_Pickup_DateTime']),\n", " FunctionTransformer(payment_lowerer, validate=False),\n", " CategoricalEncoder(categories),\n", " FunctionTransformer(pd.get_dummies, validate=False),\n", " StandardScaler(scale),\n", " LogisticRegression(),\n", " )\n", " return pipe" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Learning Curve" ] }, { "cell_type": "code", "execution_count": 29, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from sklearn.model_selection import cross_val_score, train_test_split" ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "collapsed": true }, "outputs": [], "source": [ "X_train, X_test, y_train, y_test = train_test_split(\n", " X, y\n", ")" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/model_selection/_split.py:2010: FutureWarning: From version 0.21, test_size will always complement train_size unless both are specified.\n", " FutureWarning)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Fitting for 100\n", "Scoring for 100\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/model_selection/_split.py:2010: FutureWarning: From version 0.21, test_size will always complement train_size unless both are specified.\n", " FutureWarning)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Fitting for 1000\n", "Scoring for 1000\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/model_selection/_split.py:2010: FutureWarning: From version 0.21, test_size will always complement train_size unless both are specified.\n", " FutureWarning)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Fitting for 10000\n", "Scoring for 10000\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/model_selection/_split.py:2010: FutureWarning: From version 0.21, test_size will always complement train_size unless both are specified.\n", " FutureWarning)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Fitting for 100000\n", "Scoring for 100000\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/model_selection/_split.py:2010: FutureWarning: From version 0.21, test_size will always complement train_size unless both are specified.\n", " FutureWarning)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Fitting for 1000000\n", "Scoring for 1000000\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/model_selection/_split.py:2010: FutureWarning: From version 0.21, test_size will always complement train_size unless both are specified.\n", " FutureWarning)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Fitting for 10000000\n", "Scoring for 10000000\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n", "/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/scikit-learn/sklearn/base.py:114: DeprecationWarning: Estimator Pipeline modifies parameters in __init__. This behavior is deprecated as of 0.18 and support for this behavior will be removed in 0.20.\n", " % type(estimator).__name__, DeprecationWarning)\n" ] } ], "source": [ "Ns = [100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000]\n", "\n", "scores = []\n", "\n", "for n in Ns:\n", " pipe = mkpipe()\n", " sdf = df.sample(n=n)\n", " X_train2, _, y_train2, _ = train_test_split(X, y, train_size=n)\n", " print(f\"Fitting for {n}\")\n", " pipe.fit(X_train2, y_train2)\n", " print(f\"Scoring for {n}\")\n", " scores.append(cross_val_score(pipe, X_test, y_test))" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "1.4092413" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "len(df) / 10_000_000" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Scaling it Out" ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import dask.dataframe as dd" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "%%time\n", "df = dd.read_csv(\"data/*.csv\", dtype=dtype,\n", " parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)\n", "\n", "X = df.drop(\"Tip_Amt\", axis=1)\n", "y = df['Tip_Amt'] > 0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since the scikit-learn world isn't really \"dask-aware\" at the moment, we'll use the `map_partitions` method. This is a good escape hatch for dealing with non-daskified code." ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "collapsed": true }, "outputs": [], "source": [ "yhat = X.map_partitions(lambda x: pd.Series(pipe.predict_proba(x)[:, 1], name='yhat'),\n", " meta=('yhat', 'f8'))" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 17min 52s, sys: 2min 35s, total: 20min 27s\n", "Wall time: 8min 49s\n" ] } ], "source": [ "%time yhat.to_frame().to_parquet(\"data/predictions.parq\")" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['nyc-tlc/trip data/yellow_tripdata_2009-01.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-02.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-03.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-04.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-05.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-06.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-07.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-08.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-09.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-10.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-11.csv',\n", " 'nyc-tlc/trip data/yellow_tripdata_2009-12.csv']" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ " keys = [\n", " f'nyc-tlc/trip data/yellow_tripdata_2009-{m:0>2}.csv'\n", " for m in range(1, 13)\n", " ]\n", "\n", "keys" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "k\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.1" } }, "nbformat": 4, "nbformat_minor": 2 }