In [1]:
# This notebook expects that Modin and Ray are installed, e.g. by `pip install modin[ray]`.
# For all ways to install Modin see official documentation at:
# https://modin.readthedocs.io/en/latest/installation.html

# NOTE: this is special version for showing cloud-cluster functionality.
# It requires installation of extra packages: `pip install cloudpickle rpyc`
# Also if your environment requires proxy for SSH you need to expose it via MODIN_SOCKS_PROXY environment variable,
# please note that it requires ray >= 0.8.7 to work
import modin.experimental.pandas as pd
from modin.experimental.cloud import create_cluster

Please note that some of these APIs deviate from pandas in order to provide improved performance.


In [2]:
columns_names = [
 "trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag",
 "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude",
 "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount",
 "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type",
 "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall",
 "max_temperature", "min_temperature", "average_wind_speed", "pickup_nyct2010_gid",
 "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010",
 "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma",
 "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname",
 "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode",
 "dropoff_ntaname", "dropoff_puma",
 ]
parse_dates=["pickup_datetime", "dropoff_datetime"]

In [3]:
with create_cluster('aws', '../../../aws_credentials',
 cluster_name="rayscale-test",
 region="eu-north-1", zone="eu-north-1b", image="ami-00e1e82d7d4ca80d3") as remote:
 df = pd.read_csv('https://modin-datasets.s3.amazonaws.com/trips_data.csv', names=columns_names,
 header=None, parse_dates=parse_dates)

To monitor auto-scaling activity, you can run:

 ray exec /home/vnlitvinov/.modin/cloud/config-9aba3e92.yml 'tail -n 100 -f /tmp/ray/session_*/logs/monitor*'

To open a console on the cluster:

 ray attach /home/vnlitvinov/.modin/cloud/config-9aba3e92.yml

To get a remote shell to the cluster manually, run:

 ssh -o IdentitiesOnly=yes -i /home/vnlitvinov/.ssh/ray-autoscaler_2_eu-north-1.pem ubuntu@13.48.203.18






In [4]:
with remote:
 print(df)

 trip_id vendor_id ... dropoff_ntaname dropoff_puma
0 1 2 ... NaN NaN
1 2 2 ... NaN NaN
2 3 2 ... NaN NaN
3 4 2 ... NaN NaN
4 5 2 ... NaN NaN
... ... ... ... ... ...
9995 9881 2 ... Hamilton Heights 3802.0
9996 9882 2 ... Washington Heights North 3801.0
9997 9883 2 ... East Harlem South 3804.0
9998 9884 2 ... Washington Heights South 3801.0
9999 9885 2 ... Lenox Hill-Roosevelt Island 3805.0

[10000 rows x 51 columns]


In [5]:
def q1(df):
 return df.groupby("cab_type")["cab_type"].count()
def q2(df):
 return df.groupby("passenger_count", as_index=False).mean()[["passenger_count", "total_amount"]]
def q3(df):
 return df.groupby(["passenger_count", "pickup_datetime"]).size().reset_index()
def q4(df):
 transformed = pd.DataFrame({
 "passenger_count": df["passenger_count"],
 "pickup_datetime": df["pickup_datetime"].dt.year,
 "trip_distance": df["trip_distance"].astype("int64"),
 })
 return transformed.groupby(["passenger_count", "pickup_datetime", "trip_distance"]) \
 .size().reset_index().sort_values(by=["pickup_datetime", 0], ascending=[True, False])

In [6]:
with remote:
 for query in (q1, q2, q3, q4):
 print(query(df))

10000
 passenger_count total_amount
0 0 18.333333
1 1 15.258850
2 2 20.332356
3 3 13.748845
4 4 19.742688
5 5 14.786221
6 6 15.400085
 passenger_count pickup_datetime 0
0 0 2013-08-14 12:07:00 1
1 0 2013-08-14 12:37:00 1
2 0 2013-08-15 00:00:00 1
3 1 2013-08-01 08:14:37 1
4 1 2013-08-01 09:48:00 1
... ... ... ..
9909 6 2013-09-28 18:30:15 1
9910 6 2013-09-28 19:57:22 1
9911 6 2013-09-29 18:47:29 1
9912 6 2013-09-30 02:27:33 1
9913 6 2013-09-30 21:31:06 1

[9914 rows x 3 columns]
 passenger_count pickup_datetime trip_distance 0
2 1 2013 0 1991
3 1 2013 1 1270
4 1 2013 2 853
80 5 2013 0 551
81 5 2013 1 537
.. ... ... ... ...
77 4 2013 10 1
78 4 2013 11 1
79 4 2013 14 1
102 5 2013 28 1
115 6 2013 14 1

[116 rows x 4 columns]
