{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import codecs, json\n", "import dask.dataframe as dd\n", "import numpy as np\n", "import pandas as pd\n", "import pyarrow as pa\n", "import pyarrow.parquet as pq" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "year = '2018'\n", "data_dir = '../data/' + year + '/'\n", "file_name = 'chicago-crimes-' + year" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Loading crime data from: ../data/2018/crimes-2018.snappy.parq\n", "Crime data loaded into memory.\n", "Crime data stats:\n", "---------------------------------------\n", "157,504 total records in 1 partitions\n", "DataFrame size: 2,205,056\n", "Wall time: 610 ms\n" ] } ], "source": [ "%%time\n", "# set input data file path\n", "parquet_data_dir = data_dir + 'crimes-' + year + '.snappy.parq'\n", "print('Loading crime data from: {}'.format(parquet_data_dir))\n", "\n", "# load crimes parquet data into dask df\n", "crimes = dd.read_parquet(parquet_data_dir, index='Date')\n", "\n", "# load all data into memory\n", "crimes = crimes.persist()\n", "print('Crime data loaded into memory.')\n", "\n", "# log records count and data frame stats\n", "print('Crime data stats:')\n", "print('---------------------------------------')\n", "print('{:,} total records in {} partitions'.format(len(crimes), crimes.npartitions))\n", "print('DataFrame size: {:,}'.format(crimes.size.compute()))" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
BlockPrimaryTypeFBICodeDescriptionLocationDescriptionCommunityAreaBeatDistrictWardArrestDomesticLatitudeLongitudeYear
npartitions=1
objectint8int8int16int8int8int16int8int8boolboolfloat64float64int8
..........................................
\n", "
\n", "
Dask Name: read-parquet, 1 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " Block PrimaryType FBICode Description LocationDescription CommunityArea Beat District Ward Arrest Domestic Latitude Longitude Year\n", "npartitions=1 \n", " object int8 int8 int16 int8 int8 int16 int8 int8 bool bool float64 float64 int8\n", " ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n", "Dask Name: read-parquet, 1 tasks" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "crimes" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "All Crimes: 156385\n" ] } ], "source": [ "# get crime geo data for mapping, drop na\n", "crime_geo = crimes[['PrimaryType',\n", " 'Block',\n", " 'Description',\n", " 'LocationDescription',\n", " 'CommunityArea',\n", " 'Arrest',\n", " 'Domestic',\n", " 'Latitude', \n", " 'Longitude',\n", " 'Ward']].dropna()\n", "print('All Crimes:', len(crime_geo))" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# converts crimes data to json\n", "def to_json_file(file_path, data):\n", " json.dump(data, \n", " codecs.open(file_path, 'w', encoding='utf-8'), \n", " separators=(',', ':'), sort_keys=False, indent=0)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wall time: 5.81 s\n" ] } ], "source": [ "%%time\n", "# output crimes data in raw json to see how large it gets\n", "geo_data_columns = ['Latitude', 'Longitude', 'Block', 'LocationDescription', \n", " 'PrimaryType', 'Description', 'Arrest', 'Domestic', 'Ward']\n", "to_json_file(data_dir + file_name + '.json', \n", " crime_geo[geo_data_columns].compute().values.tolist())" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wall time: 486 ms\n" ] } ], "source": [ "%%time\n", "# dish it out in snappy parquet for comparison\n", "crime_geo.to_parquet(data_dir + file_name + '.parquet', compression='SNAPPY')" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "DatetimeIndex: 156385 entries, 2018-01-01 00:00:00 to 2018-08-08 23:59:00\n", "Data columns (total 9 columns):\n", "Latitude 156385 non-null float64\n", "Longitude 156385 non-null float64\n", "Block 156385 non-null object\n", "LocationDescription 156385 non-null object\n", "PrimaryType 156385 non-null object\n", "Description 156385 non-null object\n", "Arrest 156385 non-null bool\n", "Domestic 156385 non-null bool\n", "Ward 156385 non-null float64\n", "dtypes: bool(2), float64(3), object(4)\n", "memory usage: 9.8+ MB\n" ] } ], "source": [ "# create pandas dataframe for conversion to arrow\n", "crime_geo_df = crime_geo[geo_data_columns].compute()\n", "crime_geo_df.info()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyarrow.Table\n", "Latitude: double\n", "Longitude: double\n", "Block: string\n", "LocationDescription: string\n", "PrimaryType: string\n", "Description: string\n", "Arrest: bool\n", "Domestic: bool\n", "Ward: double\n", "Date: timestamp[ns]\n", "metadata\n", "--------\n", "{b'pandas': b'{\"index_columns\": [\"Date\"], \"column_indexes\": [{\"name\": null, \"f'\n", " b'ield_name\": null, \"pandas_type\": \"unicode\", \"numpy_type\": \"objec'\n", " b't\", \"metadata\": {\"encoding\": \"UTF-8\"}}], \"columns\": [{\"name\": \"L'\n", " b'atitude\", \"field_name\": \"Latitude\", \"pandas_type\": \"float64\", \"n'\n", " b'umpy_type\": \"float64\", \"metadata\": null}, {\"name\": \"Longitude\", '\n", " b'\"field_name\": \"Longitude\", \"pandas_type\": \"float64\", \"numpy_type'\n", " b'\": \"float64\", \"metadata\": null}, {\"name\": \"Block\", \"field_name\":'\n", " b' \"Block\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"met'\n", " b'adata\": null}, {\"name\": \"LocationDescription\", \"field_name\": \"Lo'\n", " b'cationDescription\", \"pandas_type\": \"unicode\", \"numpy_type\": \"obj'\n", " b'ect\", \"metadata\": null}, {\"name\": \"PrimaryType\", \"field_name\": \"'\n", " b'PrimaryType\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", '\n", " b'\"metadata\": null}, {\"name\": \"Description\", \"field_name\": \"Descri'\n", " b'ption\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"metad'\n", " b'ata\": null}, {\"name\": \"Arrest\", \"field_name\": \"Arrest\", \"pandas_'\n", " b'type\": \"bool\", \"numpy_type\": \"bool\", \"metadata\": null}, {\"name\":'\n", " b' \"Domestic\", \"field_name\": \"Domestic\", \"pandas_type\": \"bool\", \"n'\n", " b'umpy_type\": \"bool\", \"metadata\": null}, {\"name\": \"Ward\", \"field_n'\n", " b'ame\": \"Ward\", \"pandas_type\": \"float64\", \"numpy_type\": \"float64\",'\n", " b' \"metadata\": null}, {\"name\": \"Date\", \"field_name\": \"Date\", \"pand'\n", " b'as_type\": \"datetime\", \"numpy_type\": \"datetime64[ns]\", \"metadata\"'\n", " b': null}], \"pandas_version\": \"0.23.0\"}'}" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# convert pandas data frame to arrow table\n", "crime_geo_table = pa.Table.from_pandas(crime_geo_df)\n", "crime_geo_table" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wall time: 173 ms\n" ] } ], "source": [ "%%time\n", "# write arrow table to a single parquet file, just to test it\n", "pq.write_table(crime_geo_table, data_dir + file_name + '.parq')" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wall time: 11.7 ms\n" ] } ], "source": [ "%%time\n", "# read parquet file created with arrow with dask for compatibility check\n", "ddf = dd.read_parquet(data_dir + file_name + '.parq', index='Date')" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "156,385 total records in 1 partitions\n", "DataFrame size: 1,407,465\n" ] }, { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
LatitudeLongitudeBlockLocationDescriptionPrimaryTypeDescriptionArrestDomesticWard
npartitions=1
float64float64objectobjectobjectobjectboolboolfloat64
...........................
\n", "
\n", "
Dask Name: read-parquet, 1 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " Latitude Longitude Block LocationDescription PrimaryType Description Arrest Domestic Ward\n", "npartitions=1 \n", " float64 float64 object object object object bool bool float64\n", " ... ... ... ... ... ... ... ... ...\n", "Dask Name: read-parquet, 1 tasks" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "print('{:,} total records in {} partitions'.format(len(ddf), ddf.npartitions))\n", "print('DataFrame size: {:,}'.format(ddf.size.compute()))\n", "ddf" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wall time: 75.2 ms\n" ] } ], "source": [ "%%time\n", "# read parquet file with arrow\n", "table = pq.read_table(data_dir + file_name + '.parq')" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyarrow.Table\n", "Latitude: double\n", "Longitude: double\n", "Block: string\n", "LocationDescription: string\n", "PrimaryType: string\n", "Description: string\n", "Arrest: bool\n", "Domestic: bool\n", "Ward: double\n", "Date: timestamp[us]\n", "metadata\n", "--------\n", "{b'pandas': b'{\"index_columns\": [\"Date\"], \"column_indexes\": [{\"name\": null, \"f'\n", " b'ield_name\": null, \"pandas_type\": \"unicode\", \"numpy_type\": \"objec'\n", " b't\", \"metadata\": {\"encoding\": \"UTF-8\"}}], \"columns\": [{\"name\": \"L'\n", " b'atitude\", \"field_name\": \"Latitude\", \"pandas_type\": \"float64\", \"n'\n", " b'umpy_type\": \"float64\", \"metadata\": null}, {\"name\": \"Longitude\", '\n", " b'\"field_name\": \"Longitude\", \"pandas_type\": \"float64\", \"numpy_type'\n", " b'\": \"float64\", \"metadata\": null}, {\"name\": \"Block\", \"field_name\":'\n", " b' \"Block\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"met'\n", " b'adata\": null}, {\"name\": \"LocationDescription\", \"field_name\": \"Lo'\n", " b'cationDescription\", \"pandas_type\": \"unicode\", \"numpy_type\": \"obj'\n", " b'ect\", \"metadata\": null}, {\"name\": \"PrimaryType\", \"field_name\": \"'\n", " b'PrimaryType\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", '\n", " b'\"metadata\": null}, {\"name\": \"Description\", \"field_name\": \"Descri'\n", " b'ption\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"metad'\n", " b'ata\": null}, {\"name\": \"Arrest\", \"field_name\": \"Arrest\", \"pandas_'\n", " b'type\": \"bool\", \"numpy_type\": \"bool\", \"metadata\": null}, {\"name\":'\n", " b' \"Domestic\", \"field_name\": \"Domestic\", \"pandas_type\": \"bool\", \"n'\n", " b'umpy_type\": \"bool\", \"metadata\": null}, {\"name\": \"Ward\", \"field_n'\n", " b'ame\": \"Ward\", \"pandas_type\": \"float64\", \"numpy_type\": \"float64\",'\n", " b' \"metadata\": null}, {\"name\": \"Date\", \"field_name\": \"Date\", \"pand'\n", " b'as_type\": \"datetime\", \"numpy_type\": \"datetime64[ns]\", \"metadata\"'\n", " b': null}], \"pandas_version\": \"0.23.0\"}'}" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "table" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wall time: 63.5 ms\n" ] } ], "source": [ "%%time\n", "# convert it to pandas data frame\n", "df = table.to_pandas()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "DatetimeIndex: 156385 entries, 2018-01-01 00:00:00 to 2018-08-08 23:59:00\n", "Data columns (total 9 columns):\n", "Latitude 156385 non-null float64\n", "Longitude 156385 non-null float64\n", "Block 156385 non-null object\n", "LocationDescription 156385 non-null object\n", "PrimaryType 156385 non-null object\n", "Description 156385 non-null object\n", "Arrest 156385 non-null bool\n", "Domestic 156385 non-null bool\n", "Ward 156385 non-null float64\n", "dtypes: bool(2), float64(3), object(4)\n", "memory usage: 9.8+ MB\n" ] } ], "source": [ "df.info()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wall time: 265 ms\n" ] } ], "source": [ "%%time\n", "# write arrow stream to disk\n", "writer = pa.RecordBatchFileWriter(data_dir + file_name + '.arrow', table.schema)\n", "writer.write_table(table)\n", "writer.close()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wall time: 4.88 ms\n" ] } ], "source": [ "%%time\n", "# read back binary arrow file from disk\n", "reader = pa.RecordBatchFileReader(data_dir + file_name + '.arrow')\n", "read_table = reader.read_all()" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyarrow.Table\n", "Latitude: double\n", "Longitude: double\n", "Block: string\n", "LocationDescription: string\n", "PrimaryType: string\n", "Description: string\n", "Arrest: bool\n", "Domestic: bool\n", "Ward: double\n", "Date: timestamp[us]\n", "metadata\n", "--------\n", "{b'pandas': b'{\"index_columns\": [\"Date\"], \"column_indexes\": [{\"name\": null, \"f'\n", " b'ield_name\": null, \"pandas_type\": \"unicode\", \"numpy_type\": \"objec'\n", " b't\", \"metadata\": {\"encoding\": \"UTF-8\"}}], \"columns\": [{\"name\": \"L'\n", " b'atitude\", \"field_name\": \"Latitude\", \"pandas_type\": \"float64\", \"n'\n", " b'umpy_type\": \"float64\", \"metadata\": null}, {\"name\": \"Longitude\", '\n", " b'\"field_name\": \"Longitude\", \"pandas_type\": \"float64\", \"numpy_type'\n", " b'\": \"float64\", \"metadata\": null}, {\"name\": \"Block\", \"field_name\":'\n", " b' \"Block\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"met'\n", " b'adata\": null}, {\"name\": \"LocationDescription\", \"field_name\": \"Lo'\n", " b'cationDescription\", \"pandas_type\": \"unicode\", \"numpy_type\": \"obj'\n", " b'ect\", \"metadata\": null}, {\"name\": \"PrimaryType\", \"field_name\": \"'\n", " b'PrimaryType\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", '\n", " b'\"metadata\": null}, {\"name\": \"Description\", \"field_name\": \"Descri'\n", " b'ption\", \"pandas_type\": \"unicode\", \"numpy_type\": \"object\", \"metad'\n", " b'ata\": null}, {\"name\": \"Arrest\", \"field_name\": \"Arrest\", \"pandas_'\n", " b'type\": \"bool\", \"numpy_type\": \"bool\", \"metadata\": null}, {\"name\":'\n", " b' \"Domestic\", \"field_name\": \"Domestic\", \"pandas_type\": \"bool\", \"n'\n", " b'umpy_type\": \"bool\", \"metadata\": null}, {\"name\": \"Ward\", \"field_n'\n", " b'ame\": \"Ward\", \"pandas_type\": \"float64\", \"numpy_type\": \"float64\",'\n", " b' \"metadata\": null}, {\"name\": \"Date\", \"field_name\": \"Date\", \"pand'\n", " b'as_type\": \"datetime\", \"numpy_type\": \"datetime64[ns]\", \"metadata\"'\n", " b': null}], \"pandas_version\": \"0.23.0\"}'}" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "read_table" ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }