{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import codecs, json\n",
"import dask.dataframe as dd\n",
"import numpy as np\n",
"import pandas as pd\n",
"import pyarrow as pa\n",
"import pyarrow.parquet as pq"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"year = '2018'\n",
"data_dir = '../data/' + year + '/'\n",
"file_name = 'chicago-crimes-' + year"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Loading crime data from: ../data/2018/crimes-2018.snappy.parq\n",
"Crime data loaded into memory.\n",
"Crime data stats:\n",
"---------------------------------------\n",
"157,504 total records in 1 partitions\n",
"DataFrame size: 2,205,056\n",
"Wall time: 610 ms\n"
]
}
],
"source": [
"%%time\n",
"# set input data file path\n",
"parquet_data_dir = data_dir + 'crimes-' + year + '.snappy.parq'\n",
"print('Loading crime data from: {}'.format(parquet_data_dir))\n",
"\n",
"# load crimes parquet data into dask df\n",
"crimes = dd.read_parquet(parquet_data_dir, index='Date')\n",
"\n",
"# load all data into memory\n",
"crimes = crimes.persist()\n",
"print('Crime data loaded into memory.')\n",
"\n",
"# log records count and data frame stats\n",
"print('Crime data stats:')\n",
"print('---------------------------------------')\n",
"print('{:,} total records in {} partitions'.format(len(crimes), crimes.npartitions))\n",
"print('DataFrame size: {:,}'.format(crimes.size.compute()))"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
Dask DataFrame Structure:
\n",
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" Block | \n",
" PrimaryType | \n",
" FBICode | \n",
" Description | \n",
" LocationDescription | \n",
" CommunityArea | \n",
" Beat | \n",
" District | \n",
" Ward | \n",
" Arrest | \n",
" Domestic | \n",
" Latitude | \n",
" Longitude | \n",
" Year | \n",
"
\n",
" \n",
" npartitions=1 | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" | \n",
" object | \n",
" int8 | \n",
" int8 | \n",
" int16 | \n",
" int8 | \n",
" int8 | \n",
" int16 | \n",
" int8 | \n",
" int8 | \n",
" bool | \n",
" bool | \n",
" float64 | \n",
" float64 | \n",
" int8 | \n",
"
\n",
" \n",
" | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
"
\n",
"
\n",
"Dask Name: read-parquet, 1 tasks
"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" Block PrimaryType FBICode Description LocationDescription CommunityArea Beat District Ward Arrest Domestic Latitude Longitude Year\n",
"npartitions=1 \n",
" object int8 int8 int16 int8 int8 int16 int8 int8 bool bool float64 float64 int8\n",
" ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
"Dask Name: read-parquet, 1 tasks"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"crimes"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"All Crimes: 156385\n"
]
}
],
"source": [
"# get crime geo data for mapping, drop na\n",
"crime_geo = crimes[['PrimaryType',\n",
" 'Block',\n",
" 'Description',\n",
" 'LocationDescription',\n",
" 'CommunityArea',\n",
" 'Arrest',\n",
" 'Domestic',\n",
" 'Latitude', \n",
" 'Longitude',\n",
" 'Ward']].dropna()\n",
"print('All Crimes:', len(crime_geo))"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# converts crimes data to json\n",
"def to_json_file(file_path, data):\n",
" json.dump(data, \n",
" codecs.open(file_path, 'w', encoding='utf-8'), \n",
" separators=(',', ':'), sort_keys=False, indent=0)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 5.81 s\n"
]
}
],
"source": [
"%%time\n",
"# output crimes data in raw json to see how large it gets\n",
"geo_data_columns = ['Latitude', 'Longitude', 'Block', 'LocationDescription', \n",
" 'PrimaryType', 'Description', 'Arrest', 'Domestic', 'Ward']\n",
"to_json_file(data_dir + file_name + '.json', \n",
" crime_geo[geo_data_columns].compute().values.tolist())"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 486 ms\n"
]
}
],
"source": [
"%%time\n",
"# dish it out in snappy parquet for comparison\n",
"crime_geo.to_parquet(data_dir + file_name + '.parquet', compression='SNAPPY')"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"DatetimeIndex: 156385 entries, 2018-01-01 00:00:00 to 2018-08-08 23:59:00\n",
"Data columns (total 9 columns):\n",
"Latitude 156385 non-null float64\n",
"Longitude 156385 non-null float64\n",
"Block 156385 non-null object\n",
"LocationDescription 156385 non-null object\n",
"PrimaryType 156385 non-null object\n",
"Description 156385 non-null object\n",
"Arrest 156385 non-null bool\n",
"Domestic 156385 non-null bool\n",
"Ward 156385 non-null float64\n",
"dtypes: bool(2), float64(3), object(4)\n",
"memory usage: 9.8+ MB\n"
]
}
],
"source": [
"# create pandas dataframe for conversion to arrow\n",
"crime_geo_df = crime_geo[geo_data_columns].compute()\n",
"crime_geo_df.info()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"pyarrow.Table\n",
"Latitude: double\n",
"Longitude: double\n",
"Block: string\n",
"LocationDescription: string\n",
"PrimaryType: string\n",
"Description: string\n",
"Arrest: bool\n",
"Domestic: bool\n",
"Ward: double\n",
"Date: timestamp[ns]\n",
"metadata\n",
"--------\n",
"{b'pandas': b'{\"index_columns\": [\"Date\"], \"column_indexes\": [{\"name\": null, \"f'\n",
" b'ield_name\": null, \"pandas_type\": \"unicode\", \"numpy_type\": \"objec'\n",
" b't\", \"metadata\": {\"encoding\": \"UTF-8\"}}], \"columns\": [{\"name\": \"L'\n",
" b'atitude\", \"field_name\": \"Latitude\", \"pandas_type\": \"float64\", \"n'\n",
" b'umpy_type\": \"float64\", \"metadata\": null}, {\"name\": \"Longitude\", '\n",
" b'\"field_name\": \"Longitude\", \"pandas_type\": \"float64\", \"numpy_type'\n",
" b'\": \"float64\", \"metadata\": null}, {\"name\": \"Block\", \"field_name\":'\n",
" b' \"Block\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"met'\n",
" b'adata\": null}, {\"name\": \"LocationDescription\", \"field_name\": \"Lo'\n",
" b'cationDescription\", \"pandas_type\": \"unicode\", \"numpy_type\": \"obj'\n",
" b'ect\", \"metadata\": null}, {\"name\": \"PrimaryType\", \"field_name\": \"'\n",
" b'PrimaryType\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", '\n",
" b'\"metadata\": null}, {\"name\": \"Description\", \"field_name\": \"Descri'\n",
" b'ption\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"metad'\n",
" b'ata\": null}, {\"name\": \"Arrest\", \"field_name\": \"Arrest\", \"pandas_'\n",
" b'type\": \"bool\", \"numpy_type\": \"bool\", \"metadata\": null}, {\"name\":'\n",
" b' \"Domestic\", \"field_name\": \"Domestic\", \"pandas_type\": \"bool\", \"n'\n",
" b'umpy_type\": \"bool\", \"metadata\": null}, {\"name\": \"Ward\", \"field_n'\n",
" b'ame\": \"Ward\", \"pandas_type\": \"float64\", \"numpy_type\": \"float64\",'\n",
" b' \"metadata\": null}, {\"name\": \"Date\", \"field_name\": \"Date\", \"pand'\n",
" b'as_type\": \"datetime\", \"numpy_type\": \"datetime64[ns]\", \"metadata\"'\n",
" b': null}], \"pandas_version\": \"0.23.0\"}'}"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# convert pandas data frame to arrow table\n",
"crime_geo_table = pa.Table.from_pandas(crime_geo_df)\n",
"crime_geo_table"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 173 ms\n"
]
}
],
"source": [
"%%time\n",
"# write arrow table to a single parquet file, just to test it\n",
"pq.write_table(crime_geo_table, data_dir + file_name + '.parq')"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 11.7 ms\n"
]
}
],
"source": [
"%%time\n",
"# read parquet file created with arrow with dask for compatibility check\n",
"ddf = dd.read_parquet(data_dir + file_name + '.parq', index='Date')"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"156,385 total records in 1 partitions\n",
"DataFrame size: 1,407,465\n"
]
},
{
"data": {
"text/html": [
"Dask DataFrame Structure:
\n",
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" Latitude | \n",
" Longitude | \n",
" Block | \n",
" LocationDescription | \n",
" PrimaryType | \n",
" Description | \n",
" Arrest | \n",
" Domestic | \n",
" Ward | \n",
"
\n",
" \n",
" npartitions=1 | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" | \n",
" float64 | \n",
" float64 | \n",
" object | \n",
" object | \n",
" object | \n",
" object | \n",
" bool | \n",
" bool | \n",
" float64 | \n",
"
\n",
" \n",
" | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
"
\n",
"
\n",
"Dask Name: read-parquet, 1 tasks
"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" Latitude Longitude Block LocationDescription PrimaryType Description Arrest Domestic Ward\n",
"npartitions=1 \n",
" float64 float64 object object object object bool bool float64\n",
" ... ... ... ... ... ... ... ... ...\n",
"Dask Name: read-parquet, 1 tasks"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"print('{:,} total records in {} partitions'.format(len(ddf), ddf.npartitions))\n",
"print('DataFrame size: {:,}'.format(ddf.size.compute()))\n",
"ddf"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 75.2 ms\n"
]
}
],
"source": [
"%%time\n",
"# read parquet file with arrow\n",
"table = pq.read_table(data_dir + file_name + '.parq')"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"pyarrow.Table\n",
"Latitude: double\n",
"Longitude: double\n",
"Block: string\n",
"LocationDescription: string\n",
"PrimaryType: string\n",
"Description: string\n",
"Arrest: bool\n",
"Domestic: bool\n",
"Ward: double\n",
"Date: timestamp[us]\n",
"metadata\n",
"--------\n",
"{b'pandas': b'{\"index_columns\": [\"Date\"], \"column_indexes\": [{\"name\": null, \"f'\n",
" b'ield_name\": null, \"pandas_type\": \"unicode\", \"numpy_type\": \"objec'\n",
" b't\", \"metadata\": {\"encoding\": \"UTF-8\"}}], \"columns\": [{\"name\": \"L'\n",
" b'atitude\", \"field_name\": \"Latitude\", \"pandas_type\": \"float64\", \"n'\n",
" b'umpy_type\": \"float64\", \"metadata\": null}, {\"name\": \"Longitude\", '\n",
" b'\"field_name\": \"Longitude\", \"pandas_type\": \"float64\", \"numpy_type'\n",
" b'\": \"float64\", \"metadata\": null}, {\"name\": \"Block\", \"field_name\":'\n",
" b' \"Block\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"met'\n",
" b'adata\": null}, {\"name\": \"LocationDescription\", \"field_name\": \"Lo'\n",
" b'cationDescription\", \"pandas_type\": \"unicode\", \"numpy_type\": \"obj'\n",
" b'ect\", \"metadata\": null}, {\"name\": \"PrimaryType\", \"field_name\": \"'\n",
" b'PrimaryType\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", '\n",
" b'\"metadata\": null}, {\"name\": \"Description\", \"field_name\": \"Descri'\n",
" b'ption\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"metad'\n",
" b'ata\": null}, {\"name\": \"Arrest\", \"field_name\": \"Arrest\", \"pandas_'\n",
" b'type\": \"bool\", \"numpy_type\": \"bool\", \"metadata\": null}, {\"name\":'\n",
" b' \"Domestic\", \"field_name\": \"Domestic\", \"pandas_type\": \"bool\", \"n'\n",
" b'umpy_type\": \"bool\", \"metadata\": null}, {\"name\": \"Ward\", \"field_n'\n",
" b'ame\": \"Ward\", \"pandas_type\": \"float64\", \"numpy_type\": \"float64\",'\n",
" b' \"metadata\": null}, {\"name\": \"Date\", \"field_name\": \"Date\", \"pand'\n",
" b'as_type\": \"datetime\", \"numpy_type\": \"datetime64[ns]\", \"metadata\"'\n",
" b': null}], \"pandas_version\": \"0.23.0\"}'}"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"table"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 63.5 ms\n"
]
}
],
"source": [
"%%time\n",
"# convert it to pandas data frame\n",
"df = table.to_pandas()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"DatetimeIndex: 156385 entries, 2018-01-01 00:00:00 to 2018-08-08 23:59:00\n",
"Data columns (total 9 columns):\n",
"Latitude 156385 non-null float64\n",
"Longitude 156385 non-null float64\n",
"Block 156385 non-null object\n",
"LocationDescription 156385 non-null object\n",
"PrimaryType 156385 non-null object\n",
"Description 156385 non-null object\n",
"Arrest 156385 non-null bool\n",
"Domestic 156385 non-null bool\n",
"Ward 156385 non-null float64\n",
"dtypes: bool(2), float64(3), object(4)\n",
"memory usage: 9.8+ MB\n"
]
}
],
"source": [
"df.info()"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 265 ms\n"
]
}
],
"source": [
"%%time\n",
"# write arrow stream to disk\n",
"writer = pa.RecordBatchFileWriter(data_dir + file_name + '.arrow', table.schema)\n",
"writer.write_table(table)\n",
"writer.close()"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 4.88 ms\n"
]
}
],
"source": [
"%%time\n",
"# read back binary arrow file from disk\n",
"reader = pa.RecordBatchFileReader(data_dir + file_name + '.arrow')\n",
"read_table = reader.read_all()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"pyarrow.Table\n",
"Latitude: double\n",
"Longitude: double\n",
"Block: string\n",
"LocationDescription: string\n",
"PrimaryType: string\n",
"Description: string\n",
"Arrest: bool\n",
"Domestic: bool\n",
"Ward: double\n",
"Date: timestamp[us]\n",
"metadata\n",
"--------\n",
"{b'pandas': b'{\"index_columns\": [\"Date\"], \"column_indexes\": [{\"name\": null, \"f'\n",
" b'ield_name\": null, \"pandas_type\": \"unicode\", \"numpy_type\": \"objec'\n",
" b't\", \"metadata\": {\"encoding\": \"UTF-8\"}}], \"columns\": [{\"name\": \"L'\n",
" b'atitude\", \"field_name\": \"Latitude\", \"pandas_type\": \"float64\", \"n'\n",
" b'umpy_type\": \"float64\", \"metadata\": null}, {\"name\": \"Longitude\", '\n",
" b'\"field_name\": \"Longitude\", \"pandas_type\": \"float64\", \"numpy_type'\n",
" b'\": \"float64\", \"metadata\": null}, {\"name\": \"Block\", \"field_name\":'\n",
" b' \"Block\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"met'\n",
" b'adata\": null}, {\"name\": \"LocationDescription\", \"field_name\": \"Lo'\n",
" b'cationDescription\", \"pandas_type\": \"unicode\", \"numpy_type\": \"obj'\n",
" b'ect\", \"metadata\": null}, {\"name\": \"PrimaryType\", \"field_name\": \"'\n",
" b'PrimaryType\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", '\n",
" b'\"metadata\": null}, {\"name\": \"Description\", \"field_name\": \"Descri'\n",
" b'ption\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"metad'\n",
" b'ata\": null}, {\"name\": \"Arrest\", \"field_name\": \"Arrest\", \"pandas_'\n",
" b'type\": \"bool\", \"numpy_type\": \"bool\", \"metadata\": null}, {\"name\":'\n",
" b' \"Domestic\", \"field_name\": \"Domestic\", \"pandas_type\": \"bool\", \"n'\n",
" b'umpy_type\": \"bool\", \"metadata\": null}, {\"name\": \"Ward\", \"field_n'\n",
" b'ame\": \"Ward\", \"pandas_type\": \"float64\", \"numpy_type\": \"float64\",'\n",
" b' \"metadata\": null}, {\"name\": \"Date\", \"field_name\": \"Date\", \"pand'\n",
" b'as_type\": \"datetime\", \"numpy_type\": \"datetime64[ns]\", \"metadata\"'\n",
" b': null}], \"pandas_version\": \"0.23.0\"}'}"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"read_table"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}