{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
0application_1565867330247_0001pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "from pyspark.sql import functions as F\n", "from pyspark.sql import types as T\n", "from pyspark.sql import Window, Row" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# Source bucket\n", "s3_bucket = \"s3://polakowo-yelp2/\"" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Source data paths: Yelp\n", "yelp_dir = s3_bucket + \"yelp_dataset/\"\n", "\n", "business_path = yelp_dir + \"business.json\"\n", "review_path = yelp_dir + \"review.json\"\n", "user_path = yelp_dir + \"user.json\"\n", "checkin_path = yelp_dir + \"checkin.json\"\n", "tip_path = yelp_dir + \"tip.json\"\n", "photo_path = yelp_dir + \"photo.json\"" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# Source data paths: Demographics\n", "demo_dir = s3_bucket + \"demo_dataset/\"\n", "\n", "demo_path = demo_dir + \"us-cities-demographics.json\"" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# Source data paths: Weather\n", "weather_dir = s3_bucket + \"weather_dataset/\"\n", "\n", "city_attr_path = weather_dir + \"city_attributes.csv\"\n", "weather_temp_path = weather_dir + \"temperature.csv\"\n", "weather_desc_path = weather_dir + \"weather_description.csv\"" ] }, { "cell_type": "code", "execution_count": 107, "metadata": {}, "outputs": [], "source": [ "# Target data paths\n", "staging_dir = s3_bucket + \"staging_data/\"\n", "\n", "business_attributes_path = staging_dir + \"business_attributes\"\n", "cities_path = staging_dir + \"cities\"\n", "addresses_path = staging_dir + \"addresses\"\n", "categories_path = staging_dir + \"categories\"\n", "business_categories_path = staging_dir + \"business_categories\"\n", "business_hours_path = staging_dir + \"business_hours\"\n", "businesses_path = staging_dir + \"businesses\"\n", "reviews_path = staging_dir + \"reviews\"\n", "users_path = staging_dir + \"users\"\n", "elite_years_path = staging_dir + \"elite_years\"\n", "friends_path = staging_dir + \"friends\"\n", "checkins_path = staging_dir + \"checkins\"\n", "tips_path = staging_dir + \"tips\"\n", "photos_path = staging_dir + \"photos\"\n", "city_weather_path = staging_dir + \"city_weather\"" ] }, { "cell_type": "code", "execution_count": 93, "metadata": {}, "outputs": [], "source": [ "def describe(df):\n", " # The total number of records\n", " print(\"--------------------------------------------\")\n", " print(\"count:\")\n", " print(df.count())\n", " \n", " # The number of null values for each column\n", " print(\"--------------------------------------------\")\n", " print(\"nulls:\")\n", " print(df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).collect())\n", " \n", " # The maximum number of chars for each string column (useful for table definition in Redshift)\n", " print(\"--------------------------------------------\")\n", " print(\"max_str_lengths:\")\n", " print({k: df.select(k).agg(F.max(F.length(k))).first() for k, v in df.dtypes if v.startswith('string')})\n", " \n", " # Print schema\n", " print(\"--------------------------------------------\")\n", " print(\"schema:\")\n", " df.printSchema()\n", " \n", " # Print first record\n", " print(\"--------------------------------------------\")\n", " print(\"example:\")\n", " print(df.first())" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def write_csv(df, path):\n", " # For tables with standardized text fields\n", " # CSV can be parsed and processed incrementally\n", " df.write\\\n", " .format('csv')\\\n", " .option(\"header\", \"true\")\\\n", " .option(\"delimiter\", \"\\t\")\\\n", " .mode('overwrite')\\\n", " .save(path)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "def write_json(df, path, n_partitions):\n", " # JSON files have to be parsed as a whole by Amazon Redshift\n", " # Redshift accepts JSON files of max. 4MB size (-> smart partitioning)\n", " # What you gain from using JSON is stricter string handling\n", " df.repartition(n_partitions).write\\\n", " .format('json')\\\n", " .option(\"nullValue\", None)\\\n", " .mode('overwrite')\\\n", " .save(path)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "def write_parquet(df, path):\n", " # The main format for our output files, as it combines pros of CSV and JSON formats\n", " df.write.parquet(path, mode=\"overwrite\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# business.json" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [], "source": [ "business_df = spark.read.json(business_path)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "('nulls:', [Row(address=0, attributes=28836, business_id=0, categories=482, city=0, hours=44830, is_open=0, latitude=0, longitude=0, name=0, postal_code=0, review_count=0, stars=0, state=0)])\n", "('count:', 192609)\n", "schema:\n", "root\n", " |-- address: string (nullable = true)\n", " |-- attributes: struct (nullable = true)\n", " | |-- AcceptsInsurance: string (nullable = true)\n", " | |-- AgesAllowed: string (nullable = true)\n", " | |-- Alcohol: string (nullable = true)\n", " | |-- Ambience: string (nullable = true)\n", " | |-- BYOB: string (nullable = true)\n", " | |-- BYOBCorkage: string (nullable = true)\n", " | |-- BestNights: string (nullable = true)\n", " | |-- BikeParking: string (nullable = true)\n", " | |-- BusinessAcceptsBitcoin: string (nullable = true)\n", " | |-- BusinessAcceptsCreditCards: string (nullable = true)\n", " | |-- BusinessParking: string (nullable = true)\n", " | |-- ByAppointmentOnly: string (nullable = true)\n", " | |-- Caters: string (nullable = true)\n", " | |-- CoatCheck: string (nullable = true)\n", " | |-- Corkage: string (nullable = true)\n", " | |-- DietaryRestrictions: string (nullable = true)\n", " | |-- DogsAllowed: string (nullable = true)\n", " | |-- DriveThru: string (nullable = true)\n", " | |-- GoodForDancing: string (nullable = true)\n", " | |-- GoodForKids: string (nullable = true)\n", " | |-- GoodForMeal: string (nullable = true)\n", " | |-- HairSpecializesIn: string (nullable = true)\n", " | |-- HappyHour: string (nullable = true)\n", " | |-- HasTV: string (nullable = true)\n", " | |-- Music: string (nullable = true)\n", " | |-- NoiseLevel: string (nullable = true)\n", " | |-- Open24Hours: string (nullable = true)\n", " | |-- OutdoorSeating: string (nullable = true)\n", " | |-- RestaurantsAttire: string (nullable = true)\n", " | |-- RestaurantsCounterService: string (nullable = true)\n", " | |-- RestaurantsDelivery: string (nullable = true)\n", " | |-- RestaurantsGoodForGroups: string (nullable = true)\n", " | |-- RestaurantsPriceRange2: string (nullable = true)\n", " | |-- RestaurantsReservations: string (nullable = true)\n", " | |-- RestaurantsTableService: string (nullable = true)\n", " | |-- RestaurantsTakeOut: string (nullable = true)\n", " | |-- Smoking: string (nullable = true)\n", " | |-- WheelchairAccessible: string (nullable = true)\n", " | |-- WiFi: string (nullable = true)\n", " |-- business_id: string (nullable = true)\n", " |-- categories: string (nullable = true)\n", " |-- city: string (nullable = true)\n", " |-- hours: struct (nullable = true)\n", " | |-- Friday: string (nullable = true)\n", " | |-- Monday: string (nullable = true)\n", " | |-- Saturday: string (nullable = true)\n", " | |-- Sunday: string (nullable = true)\n", " | |-- Thursday: string (nullable = true)\n", " | |-- Tuesday: string (nullable = true)\n", " | |-- Wednesday: string (nullable = true)\n", " |-- is_open: long (nullable = true)\n", " |-- latitude: double (nullable = true)\n", " |-- longitude: double (nullable = true)\n", " |-- name: string (nullable = true)\n", " |-- postal_code: string (nullable = true)\n", " |-- review_count: long (nullable = true)\n", " |-- stars: double (nullable = true)\n", " |-- state: string (nullable = true)\n", "\n", "('example:', [Row(address=u'2818 E Camino Acequia Drive', attributes=Row(AcceptsInsurance=None, AgesAllowed=None, Alcohol=None, Ambience=None, BYOB=None, BYOBCorkage=None, BestNights=None, BikeParking=None, BusinessAcceptsBitcoin=None, BusinessAcceptsCreditCards=None, BusinessParking=None, ByAppointmentOnly=None, Caters=None, CoatCheck=None, Corkage=None, DietaryRestrictions=None, DogsAllowed=None, DriveThru=None, GoodForDancing=None, GoodForKids=u'False', GoodForMeal=None, HairSpecializesIn=None, HappyHour=None, HasTV=None, Music=None, NoiseLevel=None, Open24Hours=None, OutdoorSeating=None, RestaurantsAttire=None, RestaurantsCounterService=None, RestaurantsDelivery=None, RestaurantsGoodForGroups=None, RestaurantsPriceRange2=None, RestaurantsReservations=None, RestaurantsTableService=None, RestaurantsTakeOut=None, Smoking=None, WheelchairAccessible=None, WiFi=None), business_id=u'1SWheh84yJXfytovILXOAQ', categories=u'Golf, Active Life', city=u'Phoenix', hours=None, is_open=0, latitude=33.5221425, longitude=-112.0184807, name=u'Arizona Biltmore Golf Club', postal_code=u'85016', review_count=5, stars=3.0, state=u'AZ')])" ] } ], "source": [ "describe(business_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- Extract addresses into a separate table (required)\n", "- Extract business categories into a separate table (required)\n", "- Extract business attributes into a separate table (optional)\n", "- Extract businesss hour into a separate table (optional)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## business_attributes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Unfold deep nested field `attributes` into a new table." ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "business_attributes_df = business_df.select(\"business_id\", \"attributes.*\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Boolean fields" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(AcceptsInsurance=u'None'), Row(AcceptsInsurance=u'False'), Row(AcceptsInsurance=None), Row(AcceptsInsurance=u'True')]" ] } ], "source": [ "business_attributes_df.select(\"AcceptsInsurance\").distinct().collect()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "def parse_boolean(x):\n", " # Convert boolean strings to native boolean format\n", " if x is None or x == 'None':\n", " return None\n", " if x == 'True':\n", " return True\n", " if x == 'False':\n", " return False\n", "\n", "parse_boolean_udf = F.udf(parse_boolean, T.BooleanType())\n", "\n", "bool_attrs = [\n", " \"AcceptsInsurance\",\n", " \"BYOB\",\n", " \"BikeParking\", \n", " \"BusinessAcceptsBitcoin\", \n", " \"BusinessAcceptsCreditCards\",\n", " \"ByAppointmentOnly\", \n", " \"Caters\", \n", " \"CoatCheck\", \n", " \"Corkage\", \n", " \"DogsAllowed\",\n", " \"DriveThru\", \n", " \"GoodForDancing\", \n", " \"GoodForKids\",\n", " \"HappyHour\", \n", " \"HasTV\",\n", " \"Open24Hours\", \n", " \"OutdoorSeating\", \n", " \"RestaurantsCounterService\", \n", " \"RestaurantsDelivery\", \n", " \"RestaurantsGoodForGroups\", \n", " \"RestaurantsReservations\", \n", " \"RestaurantsTableService\", \n", " \"RestaurantsTakeOut\",\n", " \"WheelchairAccessible\"\n", "]\n", "\n", "for attr in bool_attrs:\n", " business_attributes_df = business_attributes_df.withColumn(attr, parse_boolean_udf(attr))" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(AcceptsInsurance=None), Row(AcceptsInsurance=True), Row(AcceptsInsurance=False)]" ] } ], "source": [ "business_attributes_df.select(\"AcceptsInsurance\").distinct().collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### String fields" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(AgesAllowed=u'None'), Row(AgesAllowed=None), Row(AgesAllowed=u\"u'18plus'\"), Row(AgesAllowed=u\"u'21plus'\"), Row(AgesAllowed=u\"u'allages'\"), Row(AgesAllowed=u\"u'19plus'\")]" ] } ], "source": [ "business_attributes_df.select(\"AgesAllowed\").distinct().collect()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "def parse_string(x):\n", " # Clean and standardize strings\n", " if x is None or x == '':\n", " return None\n", " # Some strings are of format u\"u'string'\"\n", " return x.replace(\"u'\", \"\").replace(\"'\", \"\").lower()\n", " \n", "parse_string_udf = F.udf(parse_string, T.StringType())\n", "\n", "str_attrs = [\n", " \"AgesAllowed\", \n", " \"Alcohol\",\n", " \"BYOBCorkage\",\n", " \"NoiseLevel\",\n", " \"RestaurantsAttire\",\n", " \"Smoking\",\n", " \"WiFi\",\n", "]\n", "\n", "for attr in str_attrs:\n", " business_attributes_df = business_attributes_df.withColumn(attr, parse_string_udf(attr))" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(AgesAllowed=u'none'), Row(AgesAllowed=u'19plus'), Row(AgesAllowed=None), Row(AgesAllowed=u'allages'), Row(AgesAllowed=u'21plus'), Row(AgesAllowed=u'18plus')]" ] } ], "source": [ "business_attributes_df.select(\"AgesAllowed\").distinct().collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Integer fields" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(RestaurantsPriceRange2=u'3'), Row(RestaurantsPriceRange2=u'None'), Row(RestaurantsPriceRange2=None), Row(RestaurantsPriceRange2=u'1'), Row(RestaurantsPriceRange2=u'4'), Row(RestaurantsPriceRange2=u'2')]" ] } ], "source": [ "business_attributes_df.select(\"RestaurantsPriceRange2\").distinct().collect()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "def parse_integer(x):\n", " # Convert integers masked as strings to native integer format\n", " if x is None or x == 'None':\n", " return None\n", " return int(x)\n", " \n", "parse_integer_udf = F.udf(parse_integer, T.IntegerType())\n", "\n", "int_attrs = [\n", " \"RestaurantsPriceRange2\",\n", "]\n", "\n", "for attr in int_attrs:\n", " business_attributes_df = business_attributes_df.withColumn(attr, parse_integer_udf(attr))" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(RestaurantsPriceRange2=None), Row(RestaurantsPriceRange2=1), Row(RestaurantsPriceRange2=3), Row(RestaurantsPriceRange2=4), Row(RestaurantsPriceRange2=2)]" ] } ], "source": [ "business_attributes_df.select(\"RestaurantsPriceRange2\").distinct().collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Boolean dictionary fields" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(business_id=u'QXAEGFB4oINsVuTFxEYKFQ', Ambience=u\"{'romantic': False, 'intimate': False, 'classy': False, 'hipster': False, 'divey': False, 'touristy': False, 'trendy': False, 'upscale': False, 'casual': True}\")" ] } ], "source": [ "business_attributes_df.select(\"business_id\", \"Ambience\").where(\"Ambience is not null\").first()" ] }, { "cell_type": "code", "execution_count": 42, "metadata": { "scrolled": true }, "outputs": [], "source": [ "import ast\n", "\n", "def parse_boolean_dict(x):\n", " # Convert dicts masked as strings to string:boolean format\n", " if x is None or x == 'None' or x == '':\n", " return None\n", " return ast.literal_eval(x)\n", "\n", "parse_boolean_dict_udf = F.udf(parse_boolean_dict, T.MapType(T.StringType(), T.BooleanType()))\n", "\n", "bool_dict_attrs = [\n", " \"Ambience\",\n", " \"BestNights\",\n", " \"BusinessParking\",\n", " \"DietaryRestrictions\",\n", " \"GoodForMeal\",\n", " \"HairSpecializesIn\",\n", " \"Music\"\n", "]\n", "\n", "for attr in bool_dict_attrs:\n", " business_attributes_df = business_attributes_df.withColumn(attr, parse_boolean_dict_udf(attr))\n", " # Get all keys of the MapType\n", " # [Row(key=u'romantic'), Row(key=u'casual'), ...\n", " key_rows = business_attributes_df.select(F.explode(attr)).select(\"key\").distinct().collect()\n", " # Convert each key into column (with proper name)\n", " exprs = [\"{}['{}'] as {}\".format(attr, row.key, attr+\"_\"+row.key.replace('-', '_')) for row in key_rows]\n", " business_attributes_df = business_attributes_df.selectExpr(\"*\", *exprs).drop(attr)" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(business_id=u'QXAEGFB4oINsVuTFxEYKFQ', Ambience_romantic=False, Ambience_casual=True, Ambience_trendy=False, Ambience_intimate=False, Ambience_hipster=False, Ambience_upscale=False, Ambience_divey=False, Ambience_touristy=False, Ambience_classy=False)]" ] } ], "source": [ "business_attributes_df.where(\"business_id = 'QXAEGFB4oINsVuTFxEYKFQ'\")\\\n", " .select(\"business_id\", *filter(lambda x: x.startswith(\"Ambience\"), business_attributes_df.columns))\\\n", " .collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Unload to S3 as parquet" ] }, { "cell_type": "code", "execution_count": 44, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "('nulls:', [Row(business_id=0, AcceptsInsurance=185359, AgesAllowed=192486, Alcohol=144146, BYOB=192581, BYOBCorkage=191186, BikeParking=107210, BusinessAcceptsBitcoin=179524, BusinessAcceptsCreditCards=79476, ByAppointmentOnly=145756, Caters=151981, CoatCheck=189103, Corkage=191947, DogsAllowed=185174, DriveThru=189417, GoodForDancing=187867, GoodForKids=126299, HappyHour=187403, HasTV=144511, NoiseLevel=148730, Open24Hours=192596, OutdoorSeating=137786, RestaurantsAttire=143970, RestaurantsCounterService=192598, RestaurantsDelivery=140087, RestaurantsGoodForGroups=137891, RestaurantsPriceRange2=84430, RestaurantsReservations=140322, RestaurantsTableService=175436, RestaurantsTakeOut=130532, Smoking=189111, WheelchairAccessible=172650, WiFi=142545, Ambience_romantic=145072, Ambience_casual=145072, Ambience_trendy=145072, Ambience_intimate=145072, Ambience_hipster=145695, Ambience_upscale=145260, Ambience_divey=152636, Ambience_touristy=145072, Ambience_classy=145072, BestNights_sunday=189140, BestNights_thursday=189140, BestNights_monday=189140, BestNights_wednesday=189140, BestNights_saturday=189140, BestNights_friday=189140, BestNights_tuesday=189140, BusinessParking_valet=91169, BusinessParking_lot=91177, BusinessParking_validated=91180, BusinessParking_garage=91159, BusinessParking_street=91177, DietaryRestrictions_kosher=192557, DietaryRestrictions_dairy_free=192557, DietaryRestrictions_vegan=192557, DietaryRestrictions_vegetarian=192557, DietaryRestrictions_gluten_free=192557, DietaryRestrictions_soy_free=192557, DietaryRestrictions_halal=192557, GoodForMeal_lunch=162897, GoodForMeal_brunch=162895, GoodForMeal_dinner=162897, GoodForMeal_latenight=162897, GoodForMeal_dessert=162897, GoodForMeal_breakfast=162897, HairSpecializesIn_curly=191634, HairSpecializesIn_asian=191779, HairSpecializesIn_perms=191634, HairSpecializesIn_africanamerican=191779, HairSpecializesIn_straightperms=191779, HairSpecializesIn_kids=191634, HairSpecializesIn_coloring=191634, HairSpecializesIn_extensions=191634, Music_no_music=188653, Music_dj=187617, Music_live=188087, Music_karaoke=188128, Music_video=188126, Music_background_music=188131, Music_jukebox=188102)])\n", "('count:', 192609)\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- AcceptsInsurance: string (nullable = true)\n", " |-- AgesAllowed: string (nullable = true)\n", " |-- Alcohol: string (nullable = true)\n", " |-- BYOB: string (nullable = true)\n", " |-- BYOBCorkage: string (nullable = true)\n", " |-- BikeParking: string (nullable = true)\n", " |-- BusinessAcceptsBitcoin: string (nullable = true)\n", " |-- BusinessAcceptsCreditCards: string (nullable = true)\n", " |-- ByAppointmentOnly: string (nullable = true)\n", " |-- Caters: string (nullable = true)\n", " |-- CoatCheck: string (nullable = true)\n", " |-- Corkage: string (nullable = true)\n", " |-- DogsAllowed: string (nullable = true)\n", " |-- DriveThru: string (nullable = true)\n", " |-- GoodForDancing: string (nullable = true)\n", " |-- GoodForKids: string (nullable = true)\n", " |-- HappyHour: string (nullable = true)\n", " |-- HasTV: string (nullable = true)\n", " |-- NoiseLevel: string (nullable = true)\n", " |-- Open24Hours: string (nullable = true)\n", " |-- OutdoorSeating: string (nullable = true)\n", " |-- RestaurantsAttire: string (nullable = true)\n", " |-- RestaurantsCounterService: string (nullable = true)\n", " |-- RestaurantsDelivery: string (nullable = true)\n", " |-- RestaurantsGoodForGroups: string (nullable = true)\n", " |-- RestaurantsPriceRange2: string (nullable = true)\n", " |-- RestaurantsReservations: string (nullable = true)\n", " |-- RestaurantsTableService: string (nullable = true)\n", " |-- RestaurantsTakeOut: string (nullable = true)\n", " |-- Smoking: string (nullable = true)\n", " |-- WheelchairAccessible: string (nullable = true)\n", " |-- WiFi: string (nullable = true)\n", " |-- Ambience_romantic: boolean (nullable = true)\n", " |-- Ambience_casual: boolean (nullable = true)\n", " |-- Ambience_trendy: boolean (nullable = true)\n", " |-- Ambience_intimate: boolean (nullable = true)\n", " |-- Ambience_hipster: boolean (nullable = true)\n", " |-- Ambience_upscale: boolean (nullable = true)\n", " |-- Ambience_divey: boolean (nullable = true)\n", " |-- Ambience_touristy: boolean (nullable = true)\n", " |-- Ambience_classy: boolean (nullable = true)\n", " |-- BestNights_sunday: boolean (nullable = true)\n", " |-- BestNights_thursday: boolean (nullable = true)\n", " |-- BestNights_monday: boolean (nullable = true)\n", " |-- BestNights_wednesday: boolean (nullable = true)\n", " |-- BestNights_saturday: boolean (nullable = true)\n", " |-- BestNights_friday: boolean (nullable = true)\n", " |-- BestNights_tuesday: boolean (nullable = true)\n", " |-- BusinessParking_valet: boolean (nullable = true)\n", " |-- BusinessParking_lot: boolean (nullable = true)\n", " |-- BusinessParking_validated: boolean (nullable = true)\n", " |-- BusinessParking_garage: boolean (nullable = true)\n", " |-- BusinessParking_street: boolean (nullable = true)\n", " |-- DietaryRestrictions_kosher: boolean (nullable = true)\n", " |-- DietaryRestrictions_dairy_free: boolean (nullable = true)\n", " |-- DietaryRestrictions_vegan: boolean (nullable = true)\n", " |-- DietaryRestrictions_vegetarian: boolean (nullable = true)\n", " |-- DietaryRestrictions_gluten_free: boolean (nullable = true)\n", " |-- DietaryRestrictions_soy_free: boolean (nullable = true)\n", " |-- DietaryRestrictions_halal: boolean (nullable = true)\n", " |-- GoodForMeal_lunch: boolean (nullable = true)\n", " |-- GoodForMeal_brunch: boolean (nullable = true)\n", " |-- GoodForMeal_dinner: boolean (nullable = true)\n", " |-- GoodForMeal_latenight: boolean (nullable = true)\n", " |-- GoodForMeal_dessert: boolean (nullable = true)\n", " |-- GoodForMeal_breakfast: boolean (nullable = true)\n", " |-- HairSpecializesIn_curly: boolean (nullable = true)\n", " |-- HairSpecializesIn_asian: boolean (nullable = true)\n", " |-- HairSpecializesIn_perms: boolean (nullable = true)\n", " |-- HairSpecializesIn_africanamerican: boolean (nullable = true)\n", " |-- HairSpecializesIn_straightperms: boolean (nullable = true)\n", " |-- HairSpecializesIn_kids: boolean (nullable = true)\n", " |-- HairSpecializesIn_coloring: boolean (nullable = true)\n", " |-- HairSpecializesIn_extensions: boolean (nullable = true)\n", " |-- Music_no_music: boolean (nullable = true)\n", " |-- Music_dj: boolean (nullable = true)\n", " |-- Music_live: boolean (nullable = true)\n", " |-- Music_karaoke: boolean (nullable = true)\n", " |-- Music_video: boolean (nullable = true)\n", " |-- Music_background_music: boolean (nullable = true)\n", " |-- Music_jukebox: boolean (nullable = true)\n", "\n", "('example:', [Row(business_id=u'1SWheh84yJXfytovILXOAQ', AcceptsInsurance=None, AgesAllowed=None, Alcohol=None, BYOB=None, BYOBCorkage=None, BikeParking=None, BusinessAcceptsBitcoin=None, BusinessAcceptsCreditCards=None, ByAppointmentOnly=None, Caters=None, CoatCheck=None, Corkage=None, DogsAllowed=None, DriveThru=None, GoodForDancing=None, GoodForKids=u'False', HappyHour=None, HasTV=None, NoiseLevel=None, Open24Hours=None, OutdoorSeating=None, RestaurantsAttire=None, RestaurantsCounterService=None, RestaurantsDelivery=None, RestaurantsGoodForGroups=None, RestaurantsPriceRange2=None, RestaurantsReservations=None, RestaurantsTableService=None, RestaurantsTakeOut=None, Smoking=None, WheelchairAccessible=None, WiFi=None, Ambience_romantic=None, Ambience_casual=None, Ambience_trendy=None, Ambience_intimate=None, Ambience_hipster=None, Ambience_upscale=None, Ambience_divey=None, Ambience_touristy=None, Ambience_classy=None, BestNights_sunday=None, BestNights_thursday=None, BestNights_monday=None, BestNights_wednesday=None, BestNights_saturday=None, BestNights_friday=None, BestNights_tuesday=None, BusinessParking_valet=None, BusinessParking_lot=None, BusinessParking_validated=None, BusinessParking_garage=None, BusinessParking_street=None, DietaryRestrictions_kosher=None, DietaryRestrictions_dairy_free=None, DietaryRestrictions_vegan=None, DietaryRestrictions_vegetarian=None, DietaryRestrictions_gluten_free=None, DietaryRestrictions_soy_free=None, DietaryRestrictions_halal=None, GoodForMeal_lunch=None, GoodForMeal_brunch=None, GoodForMeal_dinner=None, GoodForMeal_latenight=None, GoodForMeal_dessert=None, GoodForMeal_breakfast=None, HairSpecializesIn_curly=None, HairSpecializesIn_asian=None, HairSpecializesIn_perms=None, HairSpecializesIn_africanamerican=None, HairSpecializesIn_straightperms=None, HairSpecializesIn_kids=None, HairSpecializesIn_coloring=None, HairSpecializesIn_extensions=None, Music_no_music=None, Music_dj=None, Music_live=None, Music_karaoke=None, Music_video=None, Music_background_music=None, Music_jukebox=None)])" ] } ], "source": [ "describe(business_attributes_df)" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [], "source": [ "write_parquet(business_attributes_df, business_attributes_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## cities" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Take `city` and `state_code` from the `business.json` and enrich them with demographics data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Demographics" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [], "source": [ "demo_df = spark.read.json(demo_path)" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "('nulls:', [Row(datasetid=0, fields=0, record_timestamp=0, recordid=0)])\n", "('count:', 2891)\n", "schema:\n", "root\n", " |-- datasetid: string (nullable = true)\n", " |-- fields: struct (nullable = true)\n", " | |-- average_household_size: double (nullable = true)\n", " | |-- city: string (nullable = true)\n", " | |-- count: long (nullable = true)\n", " | |-- female_population: long (nullable = true)\n", " | |-- foreign_born: long (nullable = true)\n", " | |-- male_population: long (nullable = true)\n", " | |-- median_age: double (nullable = true)\n", " | |-- number_of_veterans: long (nullable = true)\n", " | |-- race: string (nullable = true)\n", " | |-- state: string (nullable = true)\n", " | |-- state_code: string (nullable = true)\n", " | |-- total_population: long (nullable = true)\n", " |-- record_timestamp: string (nullable = true)\n", " |-- recordid: string (nullable = true)\n", "\n", "('example:', [Row(datasetid=u'us-cities-demographics', fields=Row(average_household_size=2.73, city=u'Newark', count=76402, female_population=143873, foreign_born=86253, male_population=138040, median_age=34.6, number_of_veterans=5829, race=u'White', state=u'New Jersey', state_code=u'NJ', total_population=281913), record_timestamp=u'1970-01-01T01:00:00+01:00', recordid=u'85458783ecf5da6572ee00e7120f68eff4fd0d61')])" ] } ], "source": [ "describe(demo_df)" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(city=u'Fontana', state_code=u'CA', count=5)" ] } ], "source": [ "demo_df.groupby(\"fields.city\", \"fields.state_code\").count().sort(F.col(\"count\").desc()).first()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Seems like this table isn't keyed by city and state code. Let's find the candidate key." ] }, { "cell_type": "code", "execution_count": 56, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(city=u'Asheville', state_code=u'NC', races=[u'White', u'Black or African-American', u'Asian', u'American Indian and Alaska Native', u'Hispanic or Latino'])" ] } ], "source": [ "demo_df.groupBy(\"fields.city\", \"fields.state_code\").agg(F.collect_list(F.col(\"fields.race\")).alias(\"races\")).first()" ] }, { "cell_type": "code", "execution_count": 58, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(city=u'Jurupa Valley', state_code=u'CA', race=u'Black or African-American', count=1)" ] } ], "source": [ "demo_df.groupBy(\"fields.city\", \"fields.state_code\", \"fields.race\").count().sort(F.col(\"count\").desc()).first()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The candidate key is composed of `city`, `state_code` and `race` fields." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each JSON object here seems to describe (1) the demographics of the city and (2) the number of people belonging to some race (`race` and `count` fields). Since each record is unique by `city`, `state_code` and `race` fields, while other demographic fields are unique by only `city` and `state_code` (which means lots of redundancy), we need to transform `race` column into columns corresponding to each of its values via `pivot` function." ] }, { "cell_type": "code", "execution_count": 59, "metadata": {}, "outputs": [], "source": [ "def prepare_race(x):\n", " # We want to make each race a stand-alone column, thus each race value needs a proper naming\n", " return x.replace(\" \", \"_\").replace(\"-\", \"_\").lower()\n", " \n", "prepare_race_udf = F.udf(prepare_race, T.StringType())\n", "\n", "# Group by all columns except race and count and convert race rows into columns\n", "demo_df = demo_df.select(\"fields.*\")\\\n", " .withColumn(\"race\", prepare_race_udf(\"race\"))\n", "demo_df = demo_df.groupby(*set(demo_df.schema.names).difference(set([\"race\", \"count\"])))\\\n", " .pivot('race')\\\n", " .max('count')" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "('nulls:', [Row(total_population=0, city=0, number_of_veterans=7, male_population=1, foreign_born=7, average_household_size=8, median_age=0, state=0, state_code=0, female_population=1, american_indian_and_alaska_native=57, asian=13, black_or_african_american=12, hispanic_or_latino=0, white=7)])\n", "('count:', 596)\n", "schema:\n", "root\n", " |-- total_population: long (nullable = true)\n", " |-- city: string (nullable = true)\n", " |-- number_of_veterans: long (nullable = true)\n", " |-- male_population: long (nullable = true)\n", " |-- foreign_born: long (nullable = true)\n", " |-- average_household_size: double (nullable = true)\n", " |-- median_age: double (nullable = true)\n", " |-- state: string (nullable = true)\n", " |-- state_code: string (nullable = true)\n", " |-- female_population: long (nullable = true)\n", " |-- american_indian_and_alaska_native: long (nullable = true)\n", " |-- asian: long (nullable = true)\n", " |-- black_or_african_american: long (nullable = true)\n", " |-- hispanic_or_latino: long (nullable = true)\n", " |-- white: long (nullable = true)\n", "\n", "('example:', [Row(total_population=1567442, city=u'Philadelphia', number_of_veterans=61995, male_population=741270, foreign_born=205339, average_household_size=2.61, median_age=34.1, state=u'Pennsylvania', state_code=u'PA', female_population=826172, american_indian_and_alaska_native=17500, asian=122721, black_or_african_american=691186, hispanic_or_latino=219038, white=688130)])" ] } ], "source": [ "describe(demo_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Cities" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [], "source": [ "# Merge city data with demographics data\n", "cities_df = business_df.selectExpr(\"city\", \"state as state_code\")\\\n", " .distinct()\\\n", " .join(demo_df, [\"city\", \"state_code\"], how=\"left\")\\\n", " .withColumn(\"city_id\", F.monotonically_increasing_id())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Unload to S3 as parquet" ] }, { "cell_type": "code", "execution_count": 91, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "count:\n", "1258\n", "nulls:\n", "[Row(city=0, state_code=0, total_population=1210, number_of_veterans=1210, male_population=1210, foreign_born=1210, average_household_size=1210, median_age=1210, state=1210, female_population=1210, american_indian_and_alaska_native=1211, asian=1210, black_or_african_american=1210, hispanic_or_latino=1210, white=1210, city_id=0)]\n", "max_str_lengths:\n", "{'city': Row(max(length(city))=50), 'state': Row(max(length(state))=14), 'state_code': Row(max(length(state_code))=3)}\n", "schema:\n", "root\n", " |-- city: string (nullable = true)\n", " |-- state_code: string (nullable = true)\n", " |-- total_population: long (nullable = true)\n", " |-- number_of_veterans: long (nullable = true)\n", " |-- male_population: long (nullable = true)\n", " |-- foreign_born: long (nullable = true)\n", " |-- average_household_size: double (nullable = true)\n", " |-- median_age: double (nullable = true)\n", " |-- state: string (nullable = true)\n", " |-- female_population: long (nullable = true)\n", " |-- american_indian_and_alaska_native: long (nullable = true)\n", " |-- asian: long (nullable = true)\n", " |-- black_or_african_american: long (nullable = true)\n", " |-- hispanic_or_latino: long (nullable = true)\n", " |-- white: long (nullable = true)\n", " |-- city_id: long (nullable = false)\n", "\n", "example:\n", "Row(city=u'Mesa', state_code=u'AZ', total_population=471833, number_of_veterans=31808, male_population=234998, foreign_born=57492, average_household_size=2.68, median_age=36.9, state=u'Arizona', female_population=236835, american_indian_and_alaska_native=16044, asian=14608, black_or_african_american=22699, hispanic_or_latino=131425, white=413010, city_id=0)" ] } ], "source": [ "describe(cities_df)" ] }, { "cell_type": "code", "execution_count": 90, "metadata": {}, "outputs": [], "source": [ "write_parquet(cities_df, cities_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## addresses" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pull address information from business.json, but instead of city take newly created `city_id`." ] }, { "cell_type": "code", "execution_count": 64, "metadata": {}, "outputs": [], "source": [ "addresses_df = business_df.selectExpr(\"address\", \"latitude\", \"longitude\", \"postal_code\", \"city\", \"state as state_code\")\\\n", " .join(cities_df.select(\"city\", \"state_code\", \"city_id\"), [\"city\", \"state_code\"], how='left')\\\n", " .drop(\"city\", \"state_code\")\\\n", " .distinct()\\\n", " .withColumn(\"address_id\", F.monotonically_increasing_id())" ] }, { "cell_type": "code", "execution_count": 94, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "178763\n", "--------------------------------------------\n", "nulls:\n", "[Row(address=0, latitude=0, longitude=0, postal_code=0, city_id=0, address_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'postal_code': Row(max(length(postal_code))=8), 'address': Row(max(length(address))=118)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- address: string (nullable = true)\n", " |-- latitude: double (nullable = true)\n", " |-- longitude: double (nullable = true)\n", " |-- postal_code: string (nullable = true)\n", " |-- city_id: long (nullable = true)\n", " |-- address_id: long (nullable = false)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(address=u'3495 Lawrence Ave E', latitude=43.757291, longitude=-79.2293784, postal_code=u'M1H 1B2', city_id=8589934592, address_id=0)" ] } ], "source": [ "describe(addresses_df)" ] }, { "cell_type": "code", "execution_count": 66, "metadata": { "scrolled": true }, "outputs": [], "source": [ "write_parquet(addresses_df, addresses_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## categories" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, create a list of unique categories and assign each of them an id." ] }, { "cell_type": "code", "execution_count": 95, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(business_id=u'1SWheh84yJXfytovILXOAQ', categories=u'Golf, Active Life')" ] } ], "source": [ "business_df.select(\"business_id\", \"categories\").first()" ] }, { "cell_type": "code", "execution_count": 68, "metadata": {}, "outputs": [], "source": [ "import re\n", "def parse_categories(categories):\n", " # Convert comma separated list of strings masked as a string into a native list type\n", " if categories is None:\n", " return []\n", " parsed = []\n", " # Some strings contain commas, so they have to be extracted beforehand\n", " require_attention = set([\"Wills, Trusts, & Probates\"])\n", " for s in require_attention:\n", " if categories.find(s) > -1:\n", " parsed.append(s)\n", " categories = categories.replace(s, \"\")\n", " return list(filter(None, parsed + re.split(r\",\\s*\", categories)))\n", " \n", "parse_categories_udf = F.udf(parse_categories, T.ArrayType(T.StringType()))\n", "business_categories_df = business_df.select(\"business_id\", \"categories\")\\\n", " .withColumn(\"categories\", parse_categories_udf(\"categories\"))" ] }, { "cell_type": "code", "execution_count": 69, "metadata": {}, "outputs": [], "source": [ "# Convert the list of categories in each row into a set of rows\n", "categories_df = business_categories_df.select(F.explode(\"categories\").alias(\"category\"))\\\n", " .dropDuplicates()\\\n", " .sort(\"category\")\\\n", " .withColumn(\"category_id\", F.monotonically_increasing_id())" ] }, { "cell_type": "code", "execution_count": 70, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "('nulls:', [Row(category=0, category_id=0)])\n", "('count:', 1298)\n", "schema:\n", "root\n", " |-- category: string (nullable = true)\n", " |-- category_id: long (nullable = false)\n", "\n", "('example:', [Row(category=u'3D Printing', category_id=0)])" ] } ], "source": [ "describe(categories_df)" ] }, { "cell_type": "code", "execution_count": 71, "metadata": {}, "outputs": [], "source": [ "write_parquet(categories_df, categories_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## business_categories" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For each record in `business.json`, convert list of categories in `categories` field into rows of pairs `business_id`-`category_id`." ] }, { "cell_type": "code", "execution_count": 96, "metadata": {}, "outputs": [], "source": [ "import re\n", "def zip_categories(business_id, categories):\n", " # For each value in categories, zip it with business_id to form a pair\n", " return list(zip([business_id] * len(categories), categories))\n", " \n", "zip_categories_udf = F.udf(zip_categories, T.ArrayType(T.ArrayType(T.StringType())))\n", "\n", "# Zip business_id's and categories and extract them into a new table called business_catagories\n", "business_categories_df = business_categories_df.select(F.explode(zip_categories_udf(\"business_id\", \"categories\")).alias(\"cols\"))\\\n", " .selectExpr(\"cols[0] as business_id\", \"cols[1] as category\")\\\n", " .dropDuplicates()\n", "business_categories_df = business_categories_df.join(categories_df, business_categories_df[\"category\"] == categories_df[\"category\"], how=\"left\")\\\n", " .drop(\"category\")" ] }, { "cell_type": "code", "execution_count": 97, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "788110\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, category_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'business_id': Row(max(length(business_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- category_id: long (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'BeNBXXzqyaHtNQI0mW7EMg', category_id=128849018882)" ] } ], "source": [ "describe(business_categories_df)" ] }, { "cell_type": "code", "execution_count": 98, "metadata": {}, "outputs": [], "source": [ "write_parquet(business_categories_df, business_categories_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## business_hours" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To enable efficient querying based on business hours, for each day of week, split the time range string into \"from\" and \"to\" integers." ] }, { "cell_type": "code", "execution_count": 99, "metadata": {}, "outputs": [], "source": [ "business_hours_df = business_df.select(\"business_id\", \"hours.*\")" ] }, { "cell_type": "code", "execution_count": 101, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(business_id=u'QXAEGFB4oINsVuTFxEYKFQ', Friday=u'9:0-1:0', Monday=u'9:0-0:0', Saturday=u'9:0-1:0', Sunday=u'9:0-0:0', Thursday=u'9:0-0:0', Tuesday=u'9:0-0:0', Wednesday=u'9:0-0:0')" ] } ], "source": [ "business_hours_df.where(\"Monday is not null\").first()" ] }, { "cell_type": "code", "execution_count": 103, "metadata": {}, "outputs": [], "source": [ "def parse_hours(x):\n", " # Take \"9:0-0:0\" (9am-00am) and transform it into {from: 900, to: 0}\n", " if x is None:\n", " return None\n", " convert_to_int = lambda x: int(x.split(':')[0]) * 100 + int(x.split(':')[1])\n", " return {\n", " \"from\": convert_to_int(x.split('-')[0]),\n", " \"to\": convert_to_int(x.split('-')[1])\n", " }\n", " \n", "parse_hours_udf = F.udf(parse_hours, T.StructType([\n", " T.StructField('from', T.IntegerType(), nullable=True),\n", " T.StructField('to', T.IntegerType(), nullable=True)\n", "]))\n", "\n", "hour_attrs = [\n", " \"Monday\",\n", " \"Tuesday\",\n", " \"Wednesday\",\n", " \"Thursday\",\n", " \"Friday\",\n", " \"Saturday\",\n", " \"Sunday\",\n", "]\n", "\n", "for attr in hour_attrs:\n", " business_hours_df = business_hours_df.withColumn(attr, parse_hours_udf(attr))\\\n", " .selectExpr(\"*\", attr+\".from as \"+attr+\"_from\", attr+\".to as \"+attr+\"_to\")\\\n", " .drop(attr)" ] }, { "cell_type": "code", "execution_count": 104, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(business_id=u'QXAEGFB4oINsVuTFxEYKFQ', Monday_from=900, Monday_to=0, Tuesday_from=900, Tuesday_to=0, Wednesday_from=900, Wednesday_to=0, Thursday_from=900, Thursday_to=0, Friday_from=900, Friday_to=100, Saturday_from=900, Saturday_to=100, Sunday_from=900, Sunday_to=0)" ] } ], "source": [ "business_hours_df.where(\"business_id = 'QXAEGFB4oINsVuTFxEYKFQ'\").first()" ] }, { "cell_type": "code", "execution_count": 106, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "192609\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, Monday_from=56842, Monday_to=56842, Tuesday_from=49181, Tuesday_to=49181, Wednesday_from=47452, Wednesday_to=47452, Thursday_from=46706, Thursday_to=46706, Friday_from=47435, Friday_to=47435, Saturday_from=66748, Saturday_to=66748, Sunday_from=101273, Sunday_to=101273)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'business_id': Row(max(length(business_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- Monday_from: integer (nullable = true)\n", " |-- Monday_to: integer (nullable = true)\n", " |-- Tuesday_from: integer (nullable = true)\n", " |-- Tuesday_to: integer (nullable = true)\n", " |-- Wednesday_from: integer (nullable = true)\n", " |-- Wednesday_to: integer (nullable = true)\n", " |-- Thursday_from: integer (nullable = true)\n", " |-- Thursday_to: integer (nullable = true)\n", " |-- Friday_from: integer (nullable = true)\n", " |-- Friday_to: integer (nullable = true)\n", " |-- Saturday_from: integer (nullable = true)\n", " |-- Saturday_to: integer (nullable = true)\n", " |-- Sunday_from: integer (nullable = true)\n", " |-- Sunday_to: integer (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'1SWheh84yJXfytovILXOAQ', Monday_from=None, Monday_to=None, Tuesday_from=None, Tuesday_to=None, Wednesday_from=None, Wednesday_to=None, Thursday_from=None, Thursday_to=None, Friday_from=None, Friday_to=None, Saturday_from=None, Saturday_to=None, Sunday_from=None, Sunday_to=None)" ] } ], "source": [ "describe(business_hours_df)" ] }, { "cell_type": "code", "execution_count": 108, "metadata": {}, "outputs": [], "source": [ "write_parquet(business_hours_df, business_hours_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## businesses" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Take any other information and write it into businesses table." ] }, { "cell_type": "code", "execution_count": 109, "metadata": {}, "outputs": [], "source": [ "businesses_df = business_df.join(addresses_df, (business_df[\"address\"] == addresses_df[\"address\"]) \n", " & (business_df[\"latitude\"] == addresses_df[\"latitude\"]) \n", " & (business_df[\"longitude\"] == addresses_df[\"longitude\"])\n", " & (business_df[\"postal_code\"] == addresses_df[\"postal_code\"]), how=\"left\")\\\n", " .selectExpr(\"business_id\", \"address_id\", \"cast(is_open as boolean)\", \"name\", \"review_count\", \"stars\")" ] }, { "cell_type": "code", "execution_count": 110, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "196728\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, address_id=0, is_open=0, name=0, review_count=0, stars=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'business_id': Row(max(length(business_id))=22), 'name': Row(max(length(name))=64)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- address_id: long (nullable = true)\n", " |-- is_open: boolean (nullable = true)\n", " |-- name: string (nullable = true)\n", " |-- review_count: long (nullable = true)\n", " |-- stars: double (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'nn8RDkUz0cWLcEhuga4f7Q', address_id=1271310320103, is_open=True, name=u'Handy AZ Man', review_count=19, stars=4.5)" ] } ], "source": [ "describe(businesses_df)" ] }, { "cell_type": "code", "execution_count": 111, "metadata": {}, "outputs": [], "source": [ "write_parquet(businesses_df, businesses_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# review.json" ] }, { "cell_type": "code", "execution_count": 112, "metadata": { "scrolled": true }, "outputs": [], "source": [ "reviews_df = spark.read.json(review_path)" ] }, { "cell_type": "code", "execution_count": 113, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "6685900\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, cool=0, date=0, funny=0, review_id=0, stars=0, text=0, useful=0, user_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'date': Row(max(length(date))=19), 'text': Row(max(length(text))=5000), 'user_id': Row(max(length(user_id))=22), 'business_id': Row(max(length(business_id))=22), 'review_id': Row(max(length(review_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- cool: long (nullable = true)\n", " |-- date: string (nullable = true)\n", " |-- funny: long (nullable = true)\n", " |-- review_id: string (nullable = true)\n", " |-- stars: double (nullable = true)\n", " |-- text: string (nullable = true)\n", " |-- useful: long (nullable = true)\n", " |-- user_id: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'ujmEBvifdJM6h6RLv4wQIg', cool=0, date=u'2013-05-07 04:34:36', funny=1, review_id=u'Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text=u'Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id=u'hG7b0MtEbXx5QzbzE6C_VA')" ] } ], "source": [ "describe(review_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The table can be used as-is, only minor transformations required." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## reviews" ] }, { "cell_type": "code", "execution_count": 114, "metadata": {}, "outputs": [], "source": [ "# date field looks more like a timestamp\n", "reviews_df = review_df.withColumnRenamed(\"date\", \"ts\")\\\n", " .withColumn(\"ts\", F.to_timestamp(\"ts\"))" ] }, { "cell_type": "code", "execution_count": 115, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "6685900\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, cool=0, ts=0, funny=0, review_id=0, stars=0, text=0, useful=0, user_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'text': Row(max(length(text))=5000), 'user_id': Row(max(length(user_id))=22), 'business_id': Row(max(length(business_id))=22), 'review_id': Row(max(length(review_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- cool: long (nullable = true)\n", " |-- ts: timestamp (nullable = true)\n", " |-- funny: long (nullable = true)\n", " |-- review_id: string (nullable = true)\n", " |-- stars: double (nullable = true)\n", " |-- text: string (nullable = true)\n", " |-- useful: long (nullable = true)\n", " |-- user_id: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'ujmEBvifdJM6h6RLv4wQIg', cool=0, ts=datetime.datetime(2013, 5, 7, 4, 34, 36), funny=1, review_id=u'Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text=u'Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id=u'hG7b0MtEbXx5QzbzE6C_VA')" ] } ], "source": [ "describe(reviews_df)" ] }, { "cell_type": "code", "execution_count": 116, "metadata": {}, "outputs": [], "source": [ "write_parquet(reviews_df, reviews_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# user.json" ] }, { "cell_type": "code", "execution_count": 117, "metadata": {}, "outputs": [], "source": [ "user_df = spark.read.json(user_path)" ] }, { "cell_type": "code", "execution_count": 118, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "1637138\n", "--------------------------------------------\n", "nulls:\n", "[Row(average_stars=0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, elite=0, fans=0, friends=0, funny=0, name=0, review_count=0, useful=0, user_id=0, yelping_since=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'yelping_since': Row(max(length(yelping_since))=19), 'friends': Row(max(length(friends))=359878), 'elite': Row(max(length(elite))=64), 'name': Row(max(length(name))=32), 'user_id': Row(max(length(user_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- average_stars: double (nullable = true)\n", " |-- compliment_cool: long (nullable = true)\n", " |-- compliment_cute: long (nullable = true)\n", " |-- compliment_funny: long (nullable = true)\n", " |-- compliment_hot: long (nullable = true)\n", " |-- compliment_list: long (nullable = true)\n", " |-- compliment_more: long (nullable = true)\n", " |-- compliment_note: long (nullable = true)\n", " |-- compliment_photos: long (nullable = true)\n", " |-- compliment_plain: long (nullable = true)\n", " |-- compliment_profile: long (nullable = true)\n", " |-- compliment_writer: long (nullable = true)\n", " |-- cool: long (nullable = true)\n", " |-- elite: string (nullable = true)\n", " |-- fans: long (nullable = true)\n", " |-- friends: string (nullable = true)\n", " |-- funny: long (nullable = true)\n", " |-- name: string (nullable = true)\n", " |-- review_count: long (nullable = true)\n", " |-- useful: long (nullable = true)\n", " |-- user_id: string (nullable = true)\n", " |-- yelping_since: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(average_stars=4.03, compliment_cool=1, compliment_cute=0, compliment_funny=1, compliment_hot=2, compliment_list=0, compliment_more=0, compliment_note=1, compliment_photos=0, compliment_plain=1, compliment_profile=0, compliment_writer=2, cool=25, elite=u'2015,2016,2017', fans=5, friends=u'c78V-rj8NQcQjOI8KP3UEA, alRMgPcngYSCJ5naFRBz5g, ajcnq75Z5xxkvUSmmJ1bCg, BSMAmp2-wMzCkhTfq9ToNg, jka10dk9ygX76hJG0gfPZQ, dut0e4xvme7QSlesOycHQA, l4l5lBnK356zBua7B-UJ6Q, 0HicMOOs-M_gl2eO-zES4Q, _uI57wL2fLyftrcSFpfSGQ, T4_Qd0YWbC3co6WSMw4vxg, iBRoLWPtWmsI1kdbE9ORSA, xjrUcid6Ymq0DoTJELkYyw, GqadWVzJ6At-vgLzK_SKgA, DvB13VJBmSnbFXBVBsKmDA, vRP9nQkYTeNioDjtxZlVhg, gT0A1iN3eeQ8EMAjJhwQtw, 6yCWjFPtp_AD4x93WAwmnw, 1dKzpNnib-JlViKv8_Gt5g, 3Bv4_JxHXq-gVLOxYMQX0Q, ikQyfu1iViYh8T0us7wiFQ, f1GGltNaB7K5DR1jf3dOmg, tgeFUChlh7v8bZFVl2-hjQ, -9-9oyXlqsMG2he5xIWdLQ, Adj9fBPVJad8vSs-mIP7gw, Ce49RY8CKXVsTifxRYFTsw, M1_7TLi8CbdA89nFLlH4iw, wFsNv-hqbW_F5-IRqfBN6g, 0Q1L7zXHocaUZ2gsG2XJeg, cBFgmOCBdhYa0xoFEAzp_g, VrD_AgiFvzqtlR15vir3SQ, cpE-7HK514Sr5vpSen9CEQ, F1UYelhPFB-zIKlt0ygIZg, CQAL1hvsLMCzuJf9AglsXw, 1KnY1wr15WfEWIRLB9IS6g, QWFQ-kXBiLbid-lm5Jr3dQ, nymT8liFugCrM16lTy0ZfQ, qj69bdd885heDvUPCyHd2Q, DySCZZcgbdrlHgEovk5y9w, lZMJIDuvhT9Dy4KyquLXyA, b_9Gn7wS93AoPZPR0dIJqQ, N07g1IaLh0_6sUjtiSRe4w, YdfPX_7DxSnKvvdCJ57iOw, 8GYryZPD22W7WgQ8kvMkEQ, cpQmAgOWatghp14h1pn1dQ, EnchhymLYMqftCRjqvVWmw, -JdfKhFktE7Zs9BMDFcPeQ, uWhC9eof98zPkvsalgaqJw, eyTlNDDaiPatfe6mheIZ0g, VfHq0o73aKsODvfAhwAQtg, kvD5tICngLAaQDujSFWupA, dXacwEhqi9-3_XT6JeH0Og, NfU0zDaTMEQ4-X9dbQWd9A, cTHWBdjSKbctSUIvWsgFxw, 3IEtCbSDF5t7RkZ20T6s9A, HJJXTrp6UybYyPdQ9DA0JA, JaXogQFVjzGRAeBvzamBHg, NUonfKkjS1iVqnNITtgXZg, D5vaJAYp0sOrGfsj9qvsMA, H27Ecbwwu4FGAlLgICourw, S8HrLmMiE4u8FWYWkNEoTw, Io36Y3xWQcIX9rYvPcYfXQ, J5mcqh8KxYpqjaLBNlwcig, -nTB3_08g06fD0GT8AtDBQ, wMpFA46lihK8oFns_5p65A, RZGFJHeomGJCWp3xcL3ejA, ZoQSzzXoSP1RxOD4Amv9Bg, qzM0EB0SkuuGIFv0adjQAQ, HuM6vvuveken-fPZ7d4olA, H3oukHpGpn9n_mJwSDSQyQ, PkmsJsQ8FIZe8eh8c_u96g, wSByVbwME4MzgkJaFyfvNg, YEVqknoDmrHAoUbHX0nPnA, li3vsK1XAPmeJYAUTYflHQ, MKc8yXi0glbPYt0Qb4PECw, fQPH6W9fksi27gkuUPnFaA, amrCMrDsoRetYFg2kwwdFA, 84dVQ6n6r2ezNaTuc7RkKA, yW9QjWY0olv5-uRKv3t_Kw, 5XJDj7c3eoidfQ3jW18Zgw, txSc6a6pIDctvwyBeu7Aqg, HFbbDCyyqP9xPkUlcxeIdg, hTUv5oh2do6Z3OppPuuiJA, gSqonG9J4fNM-fl_fE71AA, pd9mgTFpBTg5F9x-MsczNg, j3VE22V2GcHiH8UZxfFLfw, NYXlMW-T-3V4Jqr4r-i0Wg, btxgAZedxX8IWhMifA7Xkg, -Hp5mPLiRJNFnyeX5Ygzag, P6-DwVg6-t2JuQwIUEk0iQ, OI2TvxYvZrAodBG_RF53Xw, bHxf_VPKmZur1Bier-6A2A, Et_Sb39cVm81_Xe9HDM8ZQ, 5HwGl2UyYbaRq8aD6YC-fA, ZK228WMcCKLo5thcjD7rdw, iTf8wojwfm0NOi7dOiz3Nw, btYRxQYNJjpecflNHtFH0A, Kgo42FzpW_dXFgDKoewbtg, MNk_1Q_dqOY3xxHZKeO8VQ, AlwD504T9k0m5lkg3k5g6Q', funny=17, name=u'Rashmi', review_count=95, useful=84, user_id=u'l6BmjZMeQD3rDxWUbiAiow', yelping_since=u'2013-10-08 23:11:33')" ] } ], "source": [ "describe(user_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- Extract elite into a separate table (required)\n", "- Extract friends into a separate table (required)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## users" ] }, { "cell_type": "code", "execution_count": 119, "metadata": {}, "outputs": [], "source": [ "# Drop fields which will be outsourced and cast timestamp field\n", "users_df = user_df.drop(\"elite\", \"friends\")\\\n", " .withColumn(\"yelping_since\", F.to_timestamp(\"yelping_since\"))" ] }, { "cell_type": "code", "execution_count": 120, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "1637138\n", "--------------------------------------------\n", "nulls:\n", "[Row(average_stars=0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, fans=0, funny=0, name=0, review_count=0, useful=0, user_id=0, yelping_since=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'user_id': Row(max(length(user_id))=22), 'name': Row(max(length(name))=32)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- average_stars: double (nullable = true)\n", " |-- compliment_cool: long (nullable = true)\n", " |-- compliment_cute: long (nullable = true)\n", " |-- compliment_funny: long (nullable = true)\n", " |-- compliment_hot: long (nullable = true)\n", " |-- compliment_list: long (nullable = true)\n", " |-- compliment_more: long (nullable = true)\n", " |-- compliment_note: long (nullable = true)\n", " |-- compliment_photos: long (nullable = true)\n", " |-- compliment_plain: long (nullable = true)\n", " |-- compliment_profile: long (nullable = true)\n", " |-- compliment_writer: long (nullable = true)\n", " |-- cool: long (nullable = true)\n", " |-- fans: long (nullable = true)\n", " |-- funny: long (nullable = true)\n", " |-- name: string (nullable = true)\n", " |-- review_count: long (nullable = true)\n", " |-- useful: long (nullable = true)\n", " |-- user_id: string (nullable = true)\n", " |-- yelping_since: timestamp (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(average_stars=4.03, compliment_cool=1, compliment_cute=0, compliment_funny=1, compliment_hot=2, compliment_list=0, compliment_more=0, compliment_note=1, compliment_photos=0, compliment_plain=1, compliment_profile=0, compliment_writer=2, cool=25, fans=5, funny=17, name=u'Rashmi', review_count=95, useful=84, user_id=u'l6BmjZMeQD3rDxWUbiAiow', yelping_since=datetime.datetime(2013, 10, 8, 23, 11, 33))" ] } ], "source": [ "describe(users_df)" ] }, { "cell_type": "code", "execution_count": 121, "metadata": {}, "outputs": [], "source": [ "write_parquet(users_df, users_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## elite_years" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The field `elite` is a comma-separated list of strings masked as a string. Make a separate table out of it." ] }, { "cell_type": "code", "execution_count": 122, "metadata": {}, "outputs": [], "source": [ "elite_years_df = user_df.select(\"user_id\", \"elite\")\\\n", " .withColumn(\"year\", F.explode(F.split(F.col(\"elite\"), \",\")))\\\n", " .where(\"year != '' and year is not null\")\\\n", " .select(F.col(\"user_id\"), F.col(\"year\").cast(\"integer\"))" ] }, { "cell_type": "code", "execution_count": 123, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "224499\n", "--------------------------------------------\n", "nulls:\n", "[Row(user_id=0, year=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'user_id': Row(max(length(user_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- user_id: string (nullable = true)\n", " |-- year: integer (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(user_id=u'l6BmjZMeQD3rDxWUbiAiow', year=2015)" ] } ], "source": [ "describe(elite_years_df)" ] }, { "cell_type": "code", "execution_count": 124, "metadata": {}, "outputs": [], "source": [ "write_parquet(elite_years_df, elite_years_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## friends" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Basically the same procedure as `elite` to get the table of user relationships. Can take some time." ] }, { "cell_type": "code", "execution_count": 125, "metadata": {}, "outputs": [], "source": [ "friends_df = user_df.select(\"user_id\", \"friends\")\\\n", " .withColumn(\"friend_id\", F.explode(F.split(F.col(\"friends\"), \", \")))\\\n", " .where(\"friend_id != '' and friend_id is not null\")\\\n", " .select(F.col(\"user_id\"), F.col(\"friend_id\"))\\\n", " .distinct()" ] }, { "cell_type": "code", "execution_count": 126, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "75531114\n", "--------------------------------------------\n", "nulls:\n", "[Row(user_id=0, friend_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'friend_id': Row(max(length(friend_id))=22), 'user_id': Row(max(length(user_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- user_id: string (nullable = true)\n", " |-- friend_id: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(user_id=u'l6BmjZMeQD3rDxWUbiAiow', friend_id=u'wMpFA46lihK8oFns_5p65A')" ] } ], "source": [ "describe(friends_df)" ] }, { "cell_type": "code", "execution_count": 127, "metadata": {}, "outputs": [], "source": [ "write_parquet(friends_df, friends_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# checkin.json" ] }, { "cell_type": "code", "execution_count": 128, "metadata": {}, "outputs": [], "source": [ "checkin_df = spark.read.json(checkin_path)" ] }, { "cell_type": "code", "execution_count": 129, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "161950\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, date=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'date': Row(max(length(date))=3004279), 'business_id': Row(max(length(business_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- date: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'--1UhMGODdWsrMastO9DZw', date=u'2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02')" ] } ], "source": [ "describe(checkin_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## checkins" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Basically the same procedure as `friends` to get the table of pairs `business_id`:`ts`." ] }, { "cell_type": "code", "execution_count": 130, "metadata": {}, "outputs": [], "source": [ "checkins_df = checkin_df.selectExpr(\"business_id\", \"date as ts\")\\\n", " .withColumn(\"ts\", F.explode(F.split(F.col(\"ts\"), \", \")))\\\n", " .where(\"ts != '' and ts is not null\")\\\n", " .withColumn(\"ts\", F.to_timestamp(\"ts\"))" ] }, { "cell_type": "code", "execution_count": 131, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "19089148\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, ts=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'business_id': Row(max(length(business_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- ts: timestamp (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'--1UhMGODdWsrMastO9DZw', ts=datetime.datetime(2016, 4, 26, 19, 49, 16))" ] } ], "source": [ "describe(checkins_df)" ] }, { "cell_type": "code", "execution_count": 132, "metadata": {}, "outputs": [], "source": [ "write_parquet(checkins_df, checkins_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# tip.json" ] }, { "cell_type": "code", "execution_count": 133, "metadata": {}, "outputs": [], "source": [ "tip_df = spark.read.json(tip_path)" ] }, { "cell_type": "code", "execution_count": 134, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "1223094\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, compliment_count=0, date=0, text=0, user_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'date': Row(max(length(date))=19), 'text': Row(max(length(text))=500), 'user_id': Row(max(length(user_id))=22), 'business_id': Row(max(length(business_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- compliment_count: long (nullable = true)\n", " |-- date: string (nullable = true)\n", " |-- text: string (nullable = true)\n", " |-- user_id: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'VaKXUpmWTTWDKbpJ3aQdMw', compliment_count=0, date=u'2014-03-27 03:51:24', text=u'Great for watching games, ufc, and whatever else tickles yer fancy', user_id=u'UPw5DWs_b-e2JRBS-t37Ag')" ] } ], "source": [ "describe(tip_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## tips" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Assign to each record a unique id for convenience." ] }, { "cell_type": "code", "execution_count": 135, "metadata": {}, "outputs": [], "source": [ "tips_df = tip_df.withColumnRenamed(\"date\", \"ts\")\\\n", " .withColumn(\"ts\", F.to_timestamp(\"ts\"))\\\n", " .withColumn(\"tip_id\", F.monotonically_increasing_id())" ] }, { "cell_type": "code", "execution_count": 136, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "1223094\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, compliment_count=0, ts=0, text=0, user_id=0, tip_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'text': Row(max(length(text))=500), 'user_id': Row(max(length(user_id))=22), 'business_id': Row(max(length(business_id))=22)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- compliment_count: long (nullable = true)\n", " |-- ts: timestamp (nullable = true)\n", " |-- text: string (nullable = true)\n", " |-- user_id: string (nullable = true)\n", " |-- tip_id: long (nullable = false)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'VaKXUpmWTTWDKbpJ3aQdMw', compliment_count=0, ts=datetime.datetime(2014, 3, 27, 3, 51, 24), text=u'Great for watching games, ufc, and whatever else tickles yer fancy', user_id=u'UPw5DWs_b-e2JRBS-t37Ag', tip_id=0)" ] } ], "source": [ "describe(tips_df)" ] }, { "cell_type": "code", "execution_count": 137, "metadata": {}, "outputs": [], "source": [ "write_parquet(tips_df, tips_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# photo.json" ] }, { "cell_type": "code", "execution_count": 138, "metadata": {}, "outputs": [], "source": [ "photo_df = spark.read.json(photo_path)" ] }, { "cell_type": "code", "execution_count": 139, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "200000\n", "--------------------------------------------\n", "nulls:\n", "[Row(business_id=0, caption=0, label=0, photo_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'caption': Row(max(length(caption))=140), 'photo_id': Row(max(length(photo_id))=22), 'business_id': Row(max(length(business_id))=22), 'label': Row(max(length(label))=7)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- business_id: string (nullable = true)\n", " |-- caption: string (nullable = true)\n", " |-- label: string (nullable = true)\n", " |-- photo_id: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(business_id=u'rcaPajgKOJC2vo_l3xa42A', caption=u'', label=u'inside', photo_id=u'MllA1nNpcp1kDteVg6OGUw')" ] } ], "source": [ "describe(photo_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Even if we do not store any photos, this table is useful for knowing how many and what kind of photos were taken." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## photos" ] }, { "cell_type": "code", "execution_count": 140, "metadata": { "scrolled": true }, "outputs": [], "source": [ "write_parquet(photo_df, photos_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# city_attributes.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the names of US cities supported by this dataset and assign to each a `city_id`. Requires reading the table `cities`." ] }, { "cell_type": "code", "execution_count": 141, "metadata": {}, "outputs": [], "source": [ "city_attr_df = spark.read\\\n", " .format('csv')\\\n", " .option(\"header\", \"true\")\\\n", " .option(\"delimiter\", \",\")\\\n", " .load(city_attr_path)" ] }, { "cell_type": "code", "execution_count": 142, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "36\n", "--------------------------------------------\n", "nulls:\n", "[Row(City=0, Country=0, Latitude=0, Longitude=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'Latitude': Row(max(length(Latitude))=9), 'City': Row(max(length(City))=17), 'Longitude': Row(max(length(Longitude))=11), 'Country': Row(max(length(Country))=13)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- City: string (nullable = true)\n", " |-- Country: string (nullable = true)\n", " |-- Latitude: string (nullable = true)\n", " |-- Longitude: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(City=u'Vancouver', Country=u'Canada', Latitude=u'49.24966', Longitude=u'-123.119339')" ] } ], "source": [ "describe(city_attr_df)" ] }, { "cell_type": "code", "execution_count": 143, "metadata": {}, "outputs": [], "source": [ "# We only want the list of US cities\n", "cities = city_attr_df.where(\"Country = 'United States'\")\\\n", " .select(\"City\")\\\n", " .distinct()\\\n", " .rdd.flatMap(lambda x: x)\\\n", " .collect()" ] }, { "cell_type": "code", "execution_count": 144, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[u'Phoenix', u'Dallas', u'San Antonio', u'Philadelphia', u'Los Angeles', u'Indianapolis', u'San Francisco', u'San Diego', u'Nashville', u'Detroit', u'Portland', u'Pittsburgh', u'Chicago', u'Atlanta', u'Las Vegas', u'Seattle', u'Kansas City', u'Saint Louis', u'Minneapolis', u'Houston', u'Jacksonville', u'Albuquerque', u'Miami', u'New York', u'Charlotte', u'Denver', u'Boston']" ] } ], "source": [ "# The list of cities provided by the weather dataset\n", "cities" ] }, { "cell_type": "code", "execution_count": 145, "metadata": {}, "outputs": [], "source": [ "# Weather dataset doesn't provide us with the respective state codes though\n", "# How do we know whether \"Phoenix\" is in AZ or TX?\n", "# The most appropriate solution is finding the biggest city\n", "# Let's find out which of those cities are referenced in Yelp dataset and relevant to us\n", "\n", "cities_df = spark.read.parquet(cities_path)" ] }, { "cell_type": "code", "execution_count": 146, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- city: string (nullable = true)\n", " |-- state_code: string (nullable = true)\n", " |-- total_population: long (nullable = true)\n", " |-- number_of_veterans: long (nullable = true)\n", " |-- male_population: long (nullable = true)\n", " |-- foreign_born: long (nullable = true)\n", " |-- average_household_size: double (nullable = true)\n", " |-- median_age: double (nullable = true)\n", " |-- state: string (nullable = true)\n", " |-- female_population: long (nullable = true)\n", " |-- american_indian_and_alaska_native: long (nullable = true)\n", " |-- asian: long (nullable = true)\n", " |-- black_or_african_american: long (nullable = true)\n", " |-- hispanic_or_latino: long (nullable = true)\n", " |-- white: long (nullable = true)\n", " |-- city_id: long (nullable = true)" ] } ], "source": [ "cities_df.printSchema()" ] }, { "cell_type": "code", "execution_count": 147, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(city=u'Phoenix'), Row(city=u'Dallas'), Row(city=u'Los Angeles'), Row(city=u'San Diego'), Row(city=u'Pittsburgh'), Row(city=u'Las Vegas'), Row(city=u'Seattle'), Row(city=u'New York'), Row(city=u'Charlotte'), Row(city=u'Denver'), Row(city=u'Boston')]" ] } ], "source": [ "cities_df.filter(F.col(\"city\").isin(cities))\\\n", " .select(\"city\")\\\n", " .distinct()\\\n", " .collect()\n", "# Tables \"cities\" includes 11 cities out of 36 provided by the weather dataset" ] }, { "cell_type": "code", "execution_count": 148, "metadata": {}, "outputs": [], "source": [ "# Now find their states (using Google or any other API)\n", "weather_cities_df = [\n", " Row(city='Phoenix', state_code='AZ'), \n", " Row(city='Dallas', state_code='TX'), \n", " Row(city='Los Angeles', state_code='CA'), \n", " Row(city='San Diego', state_code='CA'), \n", " Row(city='Pittsburgh', state_code='PA'), \n", " Row(city='Las Vegas', state_code='NV'), \n", " Row(city='Seattle', state_code='WA'), \n", " Row(city='New York', state_code='NY'), \n", " Row(city='Charlotte', state_code='NC'), \n", " Row(city='Denver', state_code='CO'), \n", " Row(city='Boston', state_code='MA')\n", "]\n", "weather_cities_schema = T.StructType([\n", " T.StructField(\"city\", T.StringType()),\n", " T.StructField(\"state_code\", T.StringType())\n", "])\n", "weather_cities_df = spark.createDataFrame(weather_cities_df, schema=weather_cities_schema) " ] }, { "cell_type": "code", "execution_count": 149, "metadata": {}, "outputs": [], "source": [ "# Join with the cities dataset to find matches\n", "weather_cities_df = cities_df.join(weather_cities_df, [\"city\", \"state_code\"])\\\n", " .select(\"city\", \"city_id\")\\\n", " .distinct()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# temperature.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read temperaturs recorded hourly, transform them into daily averages, and filter by our cities. Also, cities are columns, so transform them into rows." ] }, { "cell_type": "code", "execution_count": 207, "metadata": {}, "outputs": [], "source": [ "weather_temp_df = spark.read\\\n", " .format('csv')\\\n", " .option(\"header\", \"true\")\\\n", " .option(\"delimiter\", \",\")\\\n", " .load(weather_temp_path)" ] }, { "cell_type": "code", "execution_count": 151, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "45253\n", "--------------------------------------------\n", "nulls:\n", "[Row(datetime=0, Vancouver=795, Portland=1, San Francisco=793, Seattle=3, Los Angeles=3, San Diego=1, Las Vegas=1, Phoenix=3, Albuquerque=1, Denver=1, San Antonio=1, Dallas=4, Houston=3, Kansas City=1, Minneapolis=13, Saint Louis=1, Chicago=3, Nashville=2, Indianapolis=7, Atlanta=6, Detroit=1, Jacksonville=1, Charlotte=3, Miami=805, Pittsburgh=3, Toronto=1, Philadelphia=3, New York=793, Montreal=3, Boston=3, Beersheba=798, Tel Aviv District=793, Eilat=792, Haifa=798, Nahariyya=797, Jerusalem=793)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'Eilat': Row(max(length(Eilat))=13), 'San Diego': Row(max(length(San Diego))=13), 'Chicago': Row(max(length(Chicago))=13), 'Philadelphia': Row(max(length(Philadelphia))=13), 'Denver': Row(max(length(Denver))=13), 'datetime': Row(max(length(datetime))=19), 'Dallas': Row(max(length(Dallas))=13), 'Nahariyya': Row(max(length(Nahariyya))=13), 'Vancouver': Row(max(length(Vancouver))=13), 'San Francisco': Row(max(length(San Francisco))=13), 'Indianapolis': Row(max(length(Indianapolis))=13), 'Phoenix': Row(max(length(Phoenix))=13), 'Pittsburgh': Row(max(length(Pittsburgh))=13), 'Nashville': Row(max(length(Nashville))=13), 'Albuquerque': Row(max(length(Albuquerque))=13), 'New York': Row(max(length(New York))=13), 'Los Angeles': Row(max(length(Los Angeles))=13), 'Atlanta': Row(max(length(Atlanta))=13), 'San Antonio': Row(max(length(San Antonio))=13), 'Toronto': Row(max(length(Toronto))=13), 'Haifa': Row(max(length(Haifa))=13), 'Charlotte': Row(max(length(Charlotte))=13), 'Miami': Row(max(length(Miami))=13), 'Kansas City': Row(max(length(Kansas City))=13), 'Detroit': Row(max(length(Detroit))=13), 'Saint Louis': Row(max(length(Saint Louis))=13), 'Tel Aviv District': Row(max(length(Tel Aviv District))=13), 'Montreal': Row(max(length(Montreal))=13), 'Houston': Row(max(length(Houston))=13), 'Jerusalem': Row(max(length(Jerusalem))=13), 'Boston': Row(max(length(Boston))=13), 'Minneapolis': Row(max(length(Minneapolis))=13), 'Jacksonville': Row(max(length(Jacksonville))=13), 'Beersheba': Row(max(length(Beersheba))=13), 'Las Vegas': Row(max(length(Las Vegas))=13), 'Portland': Row(max(length(Portland))=13), 'Seattle': Row(max(length(Seattle))=13)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- datetime: string (nullable = true)\n", " |-- Vancouver: string (nullable = true)\n", " |-- Portland: string (nullable = true)\n", " |-- San Francisco: string (nullable = true)\n", " |-- Seattle: string (nullable = true)\n", " |-- Los Angeles: string (nullable = true)\n", " |-- San Diego: string (nullable = true)\n", " |-- Las Vegas: string (nullable = true)\n", " |-- Phoenix: string (nullable = true)\n", " |-- Albuquerque: string (nullable = true)\n", " |-- Denver: string (nullable = true)\n", " |-- San Antonio: string (nullable = true)\n", " |-- Dallas: string (nullable = true)\n", " |-- Houston: string (nullable = true)\n", " |-- Kansas City: string (nullable = true)\n", " |-- Minneapolis: string (nullable = true)\n", " |-- Saint Louis: string (nullable = true)\n", " |-- Chicago: string (nullable = true)\n", " |-- Nashville: string (nullable = true)\n", " |-- Indianapolis: string (nullable = true)\n", " |-- Atlanta: string (nullable = true)\n", " |-- Detroit: string (nullable = true)\n", " |-- Jacksonville: string (nullable = true)\n", " |-- Charlotte: string (nullable = true)\n", " |-- Miami: string (nullable = true)\n", " |-- Pittsburgh: string (nullable = true)\n", " |-- Toronto: string (nullable = true)\n", " |-- Philadelphia: string (nullable = true)\n", " |-- New York: string (nullable = true)\n", " |-- Montreal: string (nullable = true)\n", " |-- Boston: string (nullable = true)\n", " |-- Beersheba: string (nullable = true)\n", " |-- Tel Aviv District: string (nullable = true)\n", " |-- Eilat: string (nullable = true)\n", " |-- Haifa: string (nullable = true)\n", " |-- Nahariyya: string (nullable = true)\n", " |-- Jerusalem: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(datetime=u'2012-10-01 12:00:00', Vancouver=None, Portland=None, San Francisco=None, Seattle=None, Los Angeles=None, San Diego=None, Las Vegas=None, Phoenix=None, Albuquerque=None, Denver=None, San Antonio=None, Dallas=None, Houston=None, Kansas City=None, Minneapolis=None, Saint Louis=None, Chicago=None, Nashville=None, Indianapolis=None, Atlanta=None, Detroit=None, Jacksonville=None, Charlotte=None, Miami=None, Pittsburgh=None, Toronto=None, Philadelphia=None, New York=None, Montreal=None, Boston=None, Beersheba=None, Tel Aviv District=None, Eilat=u'309.1', Haifa=None, Nahariyya=None, Jerusalem=None)" ] } ], "source": [ "describe(weather_temp_df)" ] }, { "cell_type": "code", "execution_count": 208, "metadata": {}, "outputs": [], "source": [ "# Extract date string from time string to be able to group by day\n", "weather_temp_df = weather_temp_df.select(\"datetime\", *cities)\\\n", " .withColumn(\"date\", F.substring(\"datetime\", 0, 10))\\\n", " .drop(\"datetime\")" ] }, { "cell_type": "code", "execution_count": 209, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(Phoenix=u'296.6', Dallas=u'289.74', San Antonio=u'289.29', Philadelphia=u'285.63', Los Angeles=u'291.87', Indianapolis=u'283.85', San Francisco=u'289.48', San Diego=u'291.53', Nashville=u'287.41', Detroit=u'284.03', Portland=u'282.08', Pittsburgh=u'281.0', Chicago=u'284.01', Atlanta=u'294.03', Las Vegas=u'293.41', Seattle=u'281.8', Kansas City=u'289.98', Saint Louis=u'286.18', Minneapolis=u'286.87', Houston=u'288.27', Jacksonville=u'298.17', Albuquerque=u'285.12', Miami=u'299.72', New York=u'288.22', Charlotte=u'288.65', Denver=u'284.61', Boston=u'287.17', date=u'2012-10-01')" ] } ], "source": [ "weather_temp_df.where(\"Phoenix is not null\").first()" ] }, { "cell_type": "code", "execution_count": 211, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(Phoenix=u'296.6'), Row(Phoenix=u'296.608508543'), Row(Phoenix=u'296.631487354'), Row(Phoenix=u'296.654466164'), Row(Phoenix=u'296.677444975'), Row(Phoenix=u'296.700423786'), Row(Phoenix=u'296.723402597'), Row(Phoenix=u'296.746381407'), Row(Phoenix=u'296.769360218'), Row(Phoenix=u'296.792339029'), Row(Phoenix=u'296.815317839')]" ] } ], "source": [ "phoenix_rows = weather_temp_df.where(\"Phoenix is not null and date = '2012-10-01'\").select(\"Phoenix\").collect()\n", "phoenix_rows" ] }, { "cell_type": "code", "execution_count": 213, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "296.7017392647272" ] } ], "source": [ "import numpy as np\n", "\n", "# For data quality check\n", "phoenix_mean_temp = np.mean([float(row.Phoenix) for row in phoenix_rows])\n", "phoenix_mean_temp" ] }, { "cell_type": "code", "execution_count": 215, "metadata": {}, "outputs": [], "source": [ "# To transform city columns into rows, transform each city individually and union all dataframes\n", "temp_df = None\n", "for city in cities:\n", " # Get average temperature in Fahrenheit for each day and city\n", " df = weather_temp_df.select(\"date\", city)\\\n", " .withColumnRenamed(city, \"temperature\")\\\n", " .withColumn(\"temperature\", F.col(\"temperature\").cast(\"double\"))\\\n", " .withColumn(\"city\", F.lit(city))\\\n", " .groupBy(\"date\", \"city\")\\\n", " .agg(F.mean(\"temperature\").alias(\"avg_temperature\"))\n", " if temp_df is None:\n", " temp_df = df\n", " else:\n", " temp_df = temp_df.union(df)\n", "weather_temp_df = temp_df" ] }, { "cell_type": "code", "execution_count": 216, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "50949" ] } ], "source": [ "# Speed up further joins\n", "weather_temp_df = weather_temp_df.repartition(1).cache()\n", "weather_temp_df.count()" ] }, { "cell_type": "code", "execution_count": 218, "metadata": {}, "outputs": [], "source": [ "phoenix_mean_temp2 = weather_temp_df.where(\"city = 'Phoenix' and date = '2012-10-01'\").collect()[0].avg_temperature\n", "\n", "assert(phoenix_mean_temp == phoenix_mean_temp2)\n", "# If we pass, the calculations are done correctly" ] }, { "cell_type": "code", "execution_count": 183, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "50949\n", "--------------------------------------------\n", "nulls:\n", "[Row(date=0, city=0, avg_temperature=99)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'date': Row(max(length(date))=10), 'city': Row(max(length(city))=13)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- date: string (nullable = true)\n", " |-- city: string (nullable = false)\n", " |-- avg_temperature: double (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(date=u'2012-10-19', city=u'Phoenix', avg_temperature=297.77833333333325)" ] } ], "source": [ "describe(weather_temp_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# weather_description.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read weather descriptions recorded hourly, pick the most frequent one on each day, and filter by our cities. \n", "\n", "The same as for temperatures, transform columns into rows." ] }, { "cell_type": "code", "execution_count": 220, "metadata": {}, "outputs": [], "source": [ "weather_desc_df = spark.read\\\n", " .format('csv')\\\n", " .option(\"header\", \"true\")\\\n", " .option(\"delimiter\", \",\")\\\n", " .load(weather_desc_path)" ] }, { "cell_type": "code", "execution_count": 185, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "45253\n", "--------------------------------------------\n", "nulls:\n", "[Row(datetime=0, Vancouver=793, Portland=1, San Francisco=793, Seattle=1, Los Angeles=1, San Diego=1, Las Vegas=1, Phoenix=1, Albuquerque=1, Denver=1, San Antonio=1, Dallas=1, Houston=1, Kansas City=1, Minneapolis=1, Saint Louis=1, Chicago=1, Nashville=1, Indianapolis=1, Atlanta=1, Detroit=1, Jacksonville=1, Charlotte=1, Miami=793, Pittsburgh=1, Toronto=1, Philadelphia=1, New York=793, Montreal=1, Boston=1, Beersheba=793, Tel Aviv District=793, Eilat=792, Haifa=793, Nahariyya=793, Jerusalem=793)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'Eilat': Row(max(length(Eilat))=28), 'San Diego': Row(max(length(San Diego))=28), 'Chicago': Row(max(length(Chicago))=35), 'Philadelphia': Row(max(length(Philadelphia))=28), 'Denver': Row(max(length(Denver))=32), 'datetime': Row(max(length(datetime))=19), 'Dallas': Row(max(length(Dallas))=32), 'Nahariyya': Row(max(length(Nahariyya))=28), 'Vancouver': Row(max(length(Vancouver))=28), 'San Francisco': Row(max(length(San Francisco))=32), 'Indianapolis': Row(max(length(Indianapolis))=32), 'Phoenix': Row(max(length(Phoenix))=28), 'Pittsburgh': Row(max(length(Pittsburgh))=32), 'Nashville': Row(max(length(Nashville))=32), 'Albuquerque': Row(max(length(Albuquerque))=35), 'New York': Row(max(length(New York))=35), 'Los Angeles': Row(max(length(Los Angeles))=28), 'Atlanta': Row(max(length(Atlanta))=32), 'San Antonio': Row(max(length(San Antonio))=35), 'Toronto': Row(max(length(Toronto))=28), 'Haifa': Row(max(length(Haifa))=28), 'Charlotte': Row(max(length(Charlotte))=32), 'Miami': Row(max(length(Miami))=28), 'Kansas City': Row(max(length(Kansas City))=28), 'Detroit': Row(max(length(Detroit))=28), 'Saint Louis': Row(max(length(Saint Louis))=32), 'Tel Aviv District': Row(max(length(Tel Aviv District))=28), 'Montreal': Row(max(length(Montreal))=28), 'Houston': Row(max(length(Houston))=32), 'Jerusalem': Row(max(length(Jerusalem))=28), 'Boston': Row(max(length(Boston))=28), 'Minneapolis': Row(max(length(Minneapolis))=35), 'Jacksonville': Row(max(length(Jacksonville))=28), 'Beersheba': Row(max(length(Beersheba))=20), 'Las Vegas': Row(max(length(Las Vegas))=28), 'Portland': Row(max(length(Portland))=28), 'Seattle': Row(max(length(Seattle))=28)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- datetime: string (nullable = true)\n", " |-- Vancouver: string (nullable = true)\n", " |-- Portland: string (nullable = true)\n", " |-- San Francisco: string (nullable = true)\n", " |-- Seattle: string (nullable = true)\n", " |-- Los Angeles: string (nullable = true)\n", " |-- San Diego: string (nullable = true)\n", " |-- Las Vegas: string (nullable = true)\n", " |-- Phoenix: string (nullable = true)\n", " |-- Albuquerque: string (nullable = true)\n", " |-- Denver: string (nullable = true)\n", " |-- San Antonio: string (nullable = true)\n", " |-- Dallas: string (nullable = true)\n", " |-- Houston: string (nullable = true)\n", " |-- Kansas City: string (nullable = true)\n", " |-- Minneapolis: string (nullable = true)\n", " |-- Saint Louis: string (nullable = true)\n", " |-- Chicago: string (nullable = true)\n", " |-- Nashville: string (nullable = true)\n", " |-- Indianapolis: string (nullable = true)\n", " |-- Atlanta: string (nullable = true)\n", " |-- Detroit: string (nullable = true)\n", " |-- Jacksonville: string (nullable = true)\n", " |-- Charlotte: string (nullable = true)\n", " |-- Miami: string (nullable = true)\n", " |-- Pittsburgh: string (nullable = true)\n", " |-- Toronto: string (nullable = true)\n", " |-- Philadelphia: string (nullable = true)\n", " |-- New York: string (nullable = true)\n", " |-- Montreal: string (nullable = true)\n", " |-- Boston: string (nullable = true)\n", " |-- Beersheba: string (nullable = true)\n", " |-- Tel Aviv District: string (nullable = true)\n", " |-- Eilat: string (nullable = true)\n", " |-- Haifa: string (nullable = true)\n", " |-- Nahariyya: string (nullable = true)\n", " |-- Jerusalem: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(datetime=u'2012-10-01 12:00:00', Vancouver=None, Portland=None, San Francisco=None, Seattle=None, Los Angeles=None, San Diego=None, Las Vegas=None, Phoenix=None, Albuquerque=None, Denver=None, San Antonio=None, Dallas=None, Houston=None, Kansas City=None, Minneapolis=None, Saint Louis=None, Chicago=None, Nashville=None, Indianapolis=None, Atlanta=None, Detroit=None, Jacksonville=None, Charlotte=None, Miami=None, Pittsburgh=None, Toronto=None, Philadelphia=None, New York=None, Montreal=None, Boston=None, Beersheba=None, Tel Aviv District=None, Eilat=u'haze', Haifa=None, Nahariyya=None, Jerusalem=None)" ] } ], "source": [ "describe(weather_desc_df)" ] }, { "cell_type": "code", "execution_count": 221, "metadata": {}, "outputs": [], "source": [ "# Extract date string from time string to be able to group by day\n", "weather_desc_df = weather_desc_df.select(\"datetime\", *cities)\\\n", " .withColumn(\"date\", F.substring(\"datetime\", 0, 10))\\\n", " .drop(\"datetime\")" ] }, { "cell_type": "code", "execution_count": 222, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(Phoenix=u'sky is clear', Dallas=u'mist', San Antonio=u'sky is clear', Philadelphia=u'broken clouds', Los Angeles=u'mist', Indianapolis=u'overcast clouds', San Francisco=u'light rain', San Diego=u'sky is clear', Nashville=u'mist', Detroit=u'sky is clear', Portland=u'scattered clouds', Pittsburgh=u'mist', Chicago=u'overcast clouds', Atlanta=u'light rain', Las Vegas=u'sky is clear', Seattle=u'sky is clear', Kansas City=u'sky is clear', Saint Louis=u'sky is clear', Minneapolis=u'broken clouds', Houston=u'sky is clear', Jacksonville=u'scattered clouds', Albuquerque=u'sky is clear', Miami=u'light intensity drizzle', New York=u'few clouds', Charlotte=u'mist', Denver=u'light rain', Boston=u'sky is clear', date=u'2012-10-01')" ] } ], "source": [ "weather_desc_df.where(\"Phoenix is not null\").first()" ] }, { "cell_type": "code", "execution_count": 225, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(Phoenix=u'few clouds'), Row(Phoenix=u'scattered clouds'), Row(Phoenix=u'scattered clouds'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'sky is clear'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'few clouds'), Row(Phoenix=u'sky is clear')]" ] } ], "source": [ "phoenix_rows = weather_desc_df.where(\"Phoenix is not null and date = '2012-12-10'\").select(\"Phoenix\").collect()\n", "phoenix_rows" ] }, { "cell_type": "code", "execution_count": 233, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "u'sky is clear'" ] } ], "source": [ "from collections import Counter\n", "\n", "# For data quality check\n", "phoenix_most_common_weather = Counter([row.Phoenix for row in phoenix_rows]).most_common()[0][0]\n", "phoenix_most_common_weather" ] }, { "cell_type": "code", "execution_count": 228, "metadata": {}, "outputs": [], "source": [ "# To transform city columns into rows, transform each city individually and union all dataframes\n", "temp_df = None\n", "for city in cities:\n", " # Get the most frequent description for each day and city\n", " window = Window.partitionBy(\"date\", \"city\").orderBy(F.desc(\"count\"))\n", " df = weather_desc_df.select(\"date\", city)\\\n", " .withColumnRenamed(city, \"weather_description\")\\\n", " .withColumn(\"city\", F.lit(city))\\\n", " .groupBy(\"date\", \"city\", \"weather_description\")\\\n", " .count()\\\n", " .withColumn(\"order\", F.row_number().over(window))\\\n", " .where(F.col(\"order\") == 1)\\\n", " .drop(\"count\", \"order\")\n", " if temp_df is None:\n", " temp_df = df\n", " else:\n", " temp_df = temp_df.union(df)\n", "weather_desc_df = temp_df" ] }, { "cell_type": "code", "execution_count": 229, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "50949" ] } ], "source": [ "# Speed up further joins\n", "weather_desc_df = weather_desc_df.repartition(1).cache()\n", "weather_desc_df.count()" ] }, { "cell_type": "code", "execution_count": 235, "metadata": {}, "outputs": [], "source": [ "phoenix_most_common_weather2 = weather_desc_df.where(\"city = 'Phoenix' and date = '2012-12-10'\").collect()[0].weather_description\n", "\n", "assert(phoenix_most_common_weather == phoenix_most_common_weather2)\n", "# If we pass, the calculations are done correctly" ] }, { "cell_type": "code", "execution_count": 189, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "50949\n", "--------------------------------------------\n", "nulls:\n", "[Row(date=0, city=0, weather_description=102)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'date': Row(max(length(date))=10), 'city': Row(max(length(city))=13), 'weather_description': Row(max(length(weather_description))=23)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- date: string (nullable = true)\n", " |-- city: string (nullable = false)\n", " |-- weather_description: string (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(date=u'2013-09-10', city=u'Phoenix', weather_description=u'light rain')" ] } ], "source": [ "describe(weather_desc_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## city_weather" ] }, { "cell_type": "code", "execution_count": 190, "metadata": {}, "outputs": [], "source": [ "# What was the weather in the city when the particular review was posted?\n", "# Join weather description with temperature, and keep only city ids which are present in Yelp\n", "city_weather_df = weather_temp_df.join(weather_desc_df, [\"city\", \"date\"])\\\n", " .join(weather_cities_df, \"city\")\\\n", " .drop(\"city\")\\\n", " .distinct()\\\n", " .withColumn(\"date\", F.to_date(\"date\"))" ] }, { "cell_type": "code", "execution_count": 191, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--------------------------------------------\n", "count:\n", "15096\n", "--------------------------------------------\n", "nulls:\n", "[Row(date=0, avg_temperature=33, weather_description=34, city_id=0)]\n", "--------------------------------------------\n", "max_str_lengths:\n", "{'weather_description': Row(max(length(weather_description))=23)}\n", "--------------------------------------------\n", "schema:\n", "root\n", " |-- date: date (nullable = true)\n", " |-- avg_temperature: double (nullable = true)\n", " |-- weather_description: string (nullable = true)\n", " |-- city_id: long (nullable = true)\n", "\n", "--------------------------------------------\n", "example:\n", "Row(date=datetime.date(2013, 11, 23), avg_temperature=285.16625, weather_description=u'sky is clear', city_id=146028888064)" ] } ], "source": [ "describe(city_weather_df)" ] }, { "cell_type": "code", "execution_count": 192, "metadata": {}, "outputs": [], "source": [ "write_parquet(city_weather_df, city_weather_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 4 }