{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 2.2 Structured Data - File Formats\n", "\n", "This notebook demonstrates using SparkSQL to read, write various structured and semi-structured data formats and convert data between them.\n", "\n", "\n", "#### Reading CSV files\n", "\n", "Let's start by loading the `NSW Air Temperature` data set from `data/nsw_temp.csv` in the CSV format:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Product code,Bureau of Meteorology station number,Year,Month,Day,Maximum temperature (Degree C),Days of accumulation of maximum temperature,Quality\r\n", "IDCJAC0010,061087,1965,01,01,25.6,,Y\r\n", "IDCJAC0010,061087,1965,01,02,32.2,1,Y\r\n", "IDCJAC0010,061087,1965,01,03,23.1,1,Y\r\n", "IDCJAC0010,061087,1965,01,04,25.6,1,Y\r\n" ] } ], "source": [ "%%sh\n", "\n", "# preview the file first\n", "head -n 5 data/nsw_temp.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's create a DataFrame from this file:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- _c0: string (nullable = true)\n", " |-- _c1: string (nullable = true)\n", " |-- _c2: string (nullable = true)\n", " |-- _c3: string (nullable = true)\n", " |-- _c4: string (nullable = true)\n", " |-- _c5: string (nullable = true)\n", " |-- _c6: string (nullable = true)\n", " |-- _c7: string (nullable = true)\n", "\n" ] } ], "source": [ "# `spark.read` provides methods for reading various data formats\n", "# including `csv()`\n", "\n", "airTemperatureDF = spark.read.csv('data/nsw_temp.csv')\n", "airTemperatureDF.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By default the file is assumed to not have the header and all the columns are assumed to be strings.\n", "\n", "This can however be modified with additional options, for example in our case we want to obtain column names from the first line (header) and also infer the types of the colums from the data." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- Product code: string (nullable = true)\n", " |-- Bureau of Meteorology station number: integer (nullable = true)\n", " |-- Year: integer (nullable = true)\n", " |-- Month: integer (nullable = true)\n", " |-- Day: integer (nullable = true)\n", " |-- Maximum temperature (Degree C): double (nullable = true)\n", " |-- Days of accumulation of maximum temperature: integer (nullable = true)\n", " |-- Quality: string (nullable = true)\n", "\n" ] } ], "source": [ "# the file is actually malformend as it includes multiple header like lines embedded in the data\n", "# we can use `mode` = 'DROPMALFORMED' to ignore these lines\n", "\n", "airTemperatureDF = spark.read.csv('data/nsw_temp.csv', inferSchema=True, header=True, mode = 'DROPMALFORMED')\n", "airTemperatureDF.printSchema()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Product codeBureau of Meteorology station numberYearMonthDayMaximum temperature (Degree C)Days of accumulation of maximum temperatureQuality
0IDCJAC00106108719651125.6NaNY
1IDCJAC00106108719651232.21.0Y
2IDCJAC00106108719651323.11.0Y
3IDCJAC00106108719651425.61.0Y
4IDCJAC00106108719651526.71.0Y
\n", "
" ], "text/plain": [ "DataFrame[Product code: string, Bureau of Meteorology station number: int, Year: int, Month: int, Day: int, Maximum temperature (Degree C): double, Days of accumulation of maximum temperature: int, Quality: string]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "display(airTemperatureDF.limit(5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Other options include specifying the separator, providing the explicit schema, specifying the NA string and reading a compressed file. For the complete list of options please check the documentation for [DataFrameReader](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Working with JSON files\n", "\n", "We will create a DataFrom from the JSON file at: `data/tweets.json`, which contains JSON encode tweets (one per line).\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\"CAPTURE_ID\":2,\"CAPTURED_AT\":\"2014-05-01 00:00:35\",\"ID\":461505248142438400,\"CREATED_AT\":\"2014-05-01 00:00:00\",\"TEXT\":\"RT @allisimpson: thank you for surprising me @radiodisney & @codysimpson 😊❤️ #RDMAs #RDMAafterparty #sweet16 http:\\/\\/t.co\\/r5apnHxHAK\",\"SCREEN_NAME\":\"ia_yeah\",\"USER_ID\":1100797128,\"LANGUAGE\":\"en\",\"PROFILE_IMAGE_URL\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/456806985443192832\\/hFOsb_G__normal.jpeg\",\"USER_CREATED_AT\":\"2013-01-18 23:12:49\",\"COUNT\":60262,\"TIME_ZONE\":\"Hawaii\",\"UTC_OFFSET\":-36000,\"FOLLOWERS\":298,\"FRIENDS\":177,\"FROM_USER_NAME\":\"MIDDLE FINGER☠\",\"RETWEET_FLAG\":\"Y\",\"PROFILE_IMAGE_URL_HTTPS\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/456806985443192832\\/hFOsb_G__normal.jpeg\",\"ORIGINAL_TWEET_ID\":461505066588192768,\"ORIGINAL_TWEET_LOCATION\":\"Gold Coast, Australia\"}\n", "{\"CAPTURE_ID\":2,\"CAPTURED_AT\":\"2014-05-01 00:00:35\",\"ID\":461505250135142400,\"CREATED_AT\":\"2014-05-01 00:00:00\",\"TEXT\":\"RT @allisimpson: thank you for surprising me @radiodisney & @codysimpson 😊❤️ #RDMAs #RDMAafterparty #sweet16 http:\\/\\/t.co\\/r5apnHxHAK\",\"SCREEN_NAME\":\"NinaMustifasari\",\"USER_ID\":554329827,\"LANGUAGE\":\"en\",\"PROFILE_IMAGE_URL\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/460202598784843776\\/fVYnO_uN_normal.jpeg\",\"LOCATION\":\"Not Ireland but Pati,Indonesia\",\"USER_CREATED_AT\":\"2012-04-15 20:34:55\",\"COUNT\":15707,\"TIME_ZONE\":\"Jakarta\",\"UTC_OFFSET\":25200,\"FOLLOWERS\":974,\"FRIENDS\":415,\"FROM_USER_NAME\":\"Nina Mustifasari\",\"RETWEET_FLAG\":\"Y\",\"PROFILE_IMAGE_URL_HTTPS\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/460202598784843776\\/fVYnO_uN_normal.jpeg\",\"ORIGINAL_TWEET_ID\":461505066588192768,\"ORIGINAL_TWEET_LOCATION\":\"Gold Coast, Australia\"}\n" ] } ], "source": [ "%%sh\n", "\n", "# Let's preview the file first\n", "\n", "head -n 2 data/tweets.json" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- CAPTURED_AT: string (nullable = true)\n", " |-- CAPTURE_ID: long (nullable = true)\n", " |-- COUNT: long (nullable = true)\n", " |-- CREATED_AT: string (nullable = true)\n", " |-- FOLLOWERS: long (nullable = true)\n", " |-- FRIENDS: long (nullable = true)\n", " |-- FROM_USER_NAME: string (nullable = true)\n", " |-- ID: long (nullable = true)\n", " |-- IN_REPLY_TO_STATUS_ID: long (nullable = true)\n", " |-- LANGUAGE: string (nullable = true)\n", " |-- LATITUDE: double (nullable = true)\n", " |-- LOCATION: string (nullable = true)\n", " |-- LONGITUDE: double (nullable = true)\n", " |-- ORIGINAL_TWEET_ID: long (nullable = true)\n", " |-- ORIGINAL_TWEET_LOCATION: string (nullable = true)\n", " |-- PROFILE_IMAGE_URL: string (nullable = true)\n", " |-- PROFILE_IMAGE_URL_HTTPS: string (nullable = true)\n", " |-- RETWEET_FLAG: string (nullable = true)\n", " |-- SCREEN_NAME: string (nullable = true)\n", " |-- TEXT: string (nullable = true)\n", " |-- TIME_ZONE: string (nullable = true)\n", " |-- TO_USER: string (nullable = true)\n", " |-- TO_USER_ID: long (nullable = true)\n", " |-- USER_CREATED_AT: string (nullable = true)\n", " |-- USER_ID: long (nullable = true)\n", " |-- UTC_OFFSET: long (nullable = true)\n", "\n" ] } ], "source": [ "# load the DataFrame from a JSON file with `spark.read.json()`\n", "tweetsDF = spark.read.json('data/tweets.json')\n", "tweetsDF.printSchema()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
CAPTURED_ATCAPTURE_IDCOUNTCREATED_ATFOLLOWERSFRIENDSFROM_USER_NAMEIDIN_REPLY_TO_STATUS_IDLANGUAGE...PROFILE_IMAGE_URL_HTTPSRETWEET_FLAGSCREEN_NAMETEXTTIME_ZONETO_USERTO_USER_IDUSER_CREATED_ATUSER_IDUTC_OFFSET
02014-05-01 00:00:352602622014-05-01 00:00:00298177MIDDLE FINGER☠461505248142438400Noneen...https://pbs.twimg.com/profile_images/456806985...Yia_yeahRT @allisimpson: thank you for surprising me @...HawaiiNoneNone2013-01-18 23:12:491100797128-36000
12014-05-01 00:00:352157072014-05-01 00:00:00974415Nina Mustifasari461505250135142400Noneen...https://pbs.twimg.com/profile_images/460202598...YNinaMustifasariRT @allisimpson: thank you for surprising me @...JakartaNoneNone2012-04-15 20:34:5555432982725200
22014-05-01 00:00:352221312014-05-01 00:00:013115370Amber (◕‿◕✿)461505252017975296Noneen...https://pbs.twimg.com/profile_images/460048163...NambershutupI also really was sour creamBrisbaneNoneNone2011-07-06 15:56:1833016025636000
\n", "

3 rows × 26 columns

\n", "
" ], "text/plain": [ "DataFrame[CAPTURED_AT: string, CAPTURE_ID: bigint, COUNT: bigint, CREATED_AT: string, FOLLOWERS: bigint, FRIENDS: bigint, FROM_USER_NAME: string, ID: bigint, IN_REPLY_TO_STATUS_ID: bigint, LANGUAGE: string, LATITUDE: double, LOCATION: string, LONGITUDE: double, ORIGINAL_TWEET_ID: bigint, ORIGINAL_TWEET_LOCATION: string, PROFILE_IMAGE_URL: string, PROFILE_IMAGE_URL_HTTPS: string, RETWEET_FLAG: string, SCREEN_NAME: string, TEXT: string, TIME_ZONE: string, TO_USER: string, TO_USER_ID: bigint, USER_CREATED_AT: string, USER_ID: bigint, UTC_OFFSET: bigint]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "display(tweetsDF.limit(3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similarly as with the `csv` files SparkSQL infers the schema (both the column names and the types) from the data.\n", "\n", "SparkSQL can also read data from `parquet` and `orc` files and extensions are available for additional formats (e.g. `avro`)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Writing CSV files\n", "\n", "We can save any DataFrame (original or tranformed) to a `csv` file:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ORIGINAL_TWEET_LOCATIONcount
0None3489
1Gold Coast, Australia699
2NC284
3Brisbane, Australia131
4Brisbane64
5Nc34
6Surfers Paradise21
7Emerald City14
8Townsville13
9The City of Townsville12
\n", "
" ], "text/plain": [ "DataFrame[ORIGINAL_TWEET_LOCATION: string, count: bigint]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.sql.functions import col, desc\n", "\n", "# create a derived DataFrame\n", "tweetsByLocationDF = tweetsDF.groupBy('ORIGINAL_TWEET_LOCATION').count().sort(desc('count')).limit(10)\n", "display(tweetsByLocationDF)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Save the derived DataFrame to a CSV file.\n", "# The `mode` parameter specifies the behavior of the write operation when the data already exists.\n", "# By default an exception is thrown, but setting it to 'overwrite' will overwrite the exising data.\n", "\n", "tweetsByLocationDF.write.csv('output/tweets_by_location.csv', mode='overwrite', header=True)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "total 8\n", "-rw-r--r-- 1 szu004 staff 0 10 Jul 20:36 _SUCCESS\n", "-rw-r--r-- 1 szu004 staff 191 10 Jul 20:36 part-00000-e2998e26-2859-4678-b3b1-14c221aee6d4.csv\n", "\n", "Content:\n", "ORIGINAL_TWEET_LOCATION,count\n", ",3489\n", "\"Gold Coast, Australia\",699\n", "NC,284\n", "\"Brisbane, Australia\",131\n" ] } ], "source": [ "%%sh\n", "\n", "# preview the output\n", "\n", "ls -l output/tweets_by_location.csv\n", "\n", "echo\n", "echo \"Content:\"\n", "\n", "head -n 5 output/tweets_by_location.csv/part*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similarly we can DataFrames in the `parquet` format, which is much more efficient both in terms of the data size and the performance of processing. Let's save the `NSW Air Temperature Data` in `parquet` format." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- Product_code: string (nullable = true)\n", " |-- Bureau_of_Meteorology_station_number: integer (nullable = true)\n", " |-- Year: integer (nullable = true)\n", " |-- Month: integer (nullable = true)\n", " |-- Day: integer (nullable = true)\n", " |-- Maximum_temperature__Degree_C_: double (nullable = true)\n", " |-- Days_of_accumulation_of_maximum_temperature: integer (nullable = true)\n", " |-- Quality: string (nullable = true)\n", "\n" ] } ], "source": [ "import re\n", "\n", "# we need to rename the colums first because the names of `parquet` columns cannot containt\n", "# spaces or other special characters.\n", "\n", "renamedDF = airTemperatureDF.select(*[col(c).alias(re.sub(r'[ ()]','_', c)) for c in airTemperatureDF.columns])\n", "renamedDF.printSchema()\n", "renamedDF.write.parquet('output/nsw_temp.parquet', mode='overwrite')" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "total 840\n", "-rw-r--r-- 1 szu004 staff 0B 10 Jul 20:37 _SUCCESS\n", "-rw-r--r-- 1 szu004 staff 133K 10 Jul 20:37 part-00000-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet\n", "-rw-r--r-- 1 szu004 staff 133K 10 Jul 20:37 part-00001-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet\n", "-rw-r--r-- 1 szu004 staff 135K 10 Jul 20:37 part-00002-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet\n", "-rw-r--r-- 1 szu004 staff 11K 10 Jul 20:37 part-00003-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet\n", "\n", "'cvs' data size:\n", "-rw-r--r-- 1 szu004 staff 13M 9 Jul 20:00 data/nsw_temp.csv\n" ] } ], "source": [ "%%sh\n", "\n", "# let's preview the results\n", "ls -lh output/nsw_temp.parquet\n", "\n", "echo\n", "\n", "# let's check the size of the `parquet` file\n", "echo \"'cvs' data size:\"\n", "\n", "ls -lh data/nsw_temp.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`parquet` file is compressed with `snappy` (or `gz`) and it's an order of magnitude smaller than the original `csv` file.\n", "\n", "For more information on writing DataFrames please check the documentation for [DataFrameWriter](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter)\n", "\n", "You can now play around modifying pieces of the code.\n", "\n", "When you are done and you are running off the local machine remember to *close the notebook* with `File/Close and Halt`." ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 1 }