{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 2.2 Structured Data - File Formats\n", "\n", "This notebook demonstrates using SparkSQL to read, write various structured and semi-structured data formats and convert data between them.\n", "\n", "\n", "#### Reading CSV files\n", "\n", "Let's start by loading the `NSW Air Temperature` data set from `data/nsw_temp.csv` in the CSV format:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Product code,Bureau of Meteorology station number,Year,Month,Day,Maximum temperature (Degree C),Days of accumulation of maximum temperature,Quality\r\n", "IDCJAC0010,061087,1965,01,01,25.6,,Y\r\n", "IDCJAC0010,061087,1965,01,02,32.2,1,Y\r\n", "IDCJAC0010,061087,1965,01,03,23.1,1,Y\r\n", "IDCJAC0010,061087,1965,01,04,25.6,1,Y\r\n" ] } ], "source": [ "%%sh\n", "\n", "# preview the file first\n", "head -n 5 data/nsw_temp.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's create a DataFrame from this file:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- _c0: string (nullable = true)\n", " |-- _c1: string (nullable = true)\n", " |-- _c2: string (nullable = true)\n", " |-- _c3: string (nullable = true)\n", " |-- _c4: string (nullable = true)\n", " |-- _c5: string (nullable = true)\n", " |-- _c6: string (nullable = true)\n", " |-- _c7: string (nullable = true)\n", "\n" ] } ], "source": [ "# `spark.read` provides methods for reading various data formats\n", "# including `csv()`\n", "\n", "airTemperatureDF = spark.read.csv('data/nsw_temp.csv')\n", "airTemperatureDF.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By default the file is assumed to not have the header and all the columns are assumed to be strings.\n", "\n", "This can however be modified with additional options, for example in our case we want to obtain column names from the first line (header) and also infer the types of the colums from the data." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- Product code: string (nullable = true)\n", " |-- Bureau of Meteorology station number: integer (nullable = true)\n", " |-- Year: integer (nullable = true)\n", " |-- Month: integer (nullable = true)\n", " |-- Day: integer (nullable = true)\n", " |-- Maximum temperature (Degree C): double (nullable = true)\n", " |-- Days of accumulation of maximum temperature: integer (nullable = true)\n", " |-- Quality: string (nullable = true)\n", "\n" ] } ], "source": [ "# the file is actually malformend as it includes multiple header like lines embedded in the data\n", "# we can use `mode` = 'DROPMALFORMED' to ignore these lines\n", "\n", "airTemperatureDF = spark.read.csv('data/nsw_temp.csv', inferSchema=True, header=True, mode = 'DROPMALFORMED')\n", "airTemperatureDF.printSchema()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>Product code</th>\n", " <th>Bureau of Meteorology station number</th>\n", " <th>Year</th>\n", " <th>Month</th>\n", " <th>Day</th>\n", " <th>Maximum temperature (Degree C)</th>\n", " <th>Days of accumulation of maximum temperature</th>\n", " <th>Quality</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>IDCJAC0010</td>\n", " <td>61087</td>\n", " <td>1965</td>\n", " <td>1</td>\n", " <td>1</td>\n", " <td>25.6</td>\n", " <td>NaN</td>\n", " <td>Y</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>IDCJAC0010</td>\n", " <td>61087</td>\n", " <td>1965</td>\n", " <td>1</td>\n", " <td>2</td>\n", " <td>32.2</td>\n", " <td>1.0</td>\n", " <td>Y</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>IDCJAC0010</td>\n", " <td>61087</td>\n", " <td>1965</td>\n", " <td>1</td>\n", " <td>3</td>\n", " <td>23.1</td>\n", " <td>1.0</td>\n", " <td>Y</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>IDCJAC0010</td>\n", " <td>61087</td>\n", " <td>1965</td>\n", " <td>1</td>\n", " <td>4</td>\n", " <td>25.6</td>\n", " <td>1.0</td>\n", " <td>Y</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>IDCJAC0010</td>\n", " <td>61087</td>\n", " <td>1965</td>\n", " <td>1</td>\n", " <td>5</td>\n", " <td>26.7</td>\n", " <td>1.0</td>\n", " <td>Y</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[Product code: string, Bureau of Meteorology station number: int, Year: int, Month: int, Day: int, Maximum temperature (Degree C): double, Days of accumulation of maximum temperature: int, Quality: string]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "display(airTemperatureDF.limit(5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Other options include specifying the separator, providing the explicit schema, specifying the NA string and reading a compressed file. For the complete list of options please check the documentation for [DataFrameReader](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Working with JSON files\n", "\n", "We will create a DataFrom from the JSON file at: `data/tweets.json`, which contains JSON encode tweets (one per line).\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\"CAPTURE_ID\":2,\"CAPTURED_AT\":\"2014-05-01 00:00:35\",\"ID\":461505248142438400,\"CREATED_AT\":\"2014-05-01 00:00:00\",\"TEXT\":\"RT @allisimpson: thank you for surprising me @radiodisney & @codysimpson 😊❤️ #RDMAs #RDMAafterparty #sweet16 http:\\/\\/t.co\\/r5apnHxHAK\",\"SCREEN_NAME\":\"ia_yeah\",\"USER_ID\":1100797128,\"LANGUAGE\":\"en\",\"PROFILE_IMAGE_URL\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/456806985443192832\\/hFOsb_G__normal.jpeg\",\"USER_CREATED_AT\":\"2013-01-18 23:12:49\",\"COUNT\":60262,\"TIME_ZONE\":\"Hawaii\",\"UTC_OFFSET\":-36000,\"FOLLOWERS\":298,\"FRIENDS\":177,\"FROM_USER_NAME\":\"MIDDLE FINGER☠\",\"RETWEET_FLAG\":\"Y\",\"PROFILE_IMAGE_URL_HTTPS\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/456806985443192832\\/hFOsb_G__normal.jpeg\",\"ORIGINAL_TWEET_ID\":461505066588192768,\"ORIGINAL_TWEET_LOCATION\":\"Gold Coast, Australia\"}\n", "{\"CAPTURE_ID\":2,\"CAPTURED_AT\":\"2014-05-01 00:00:35\",\"ID\":461505250135142400,\"CREATED_AT\":\"2014-05-01 00:00:00\",\"TEXT\":\"RT @allisimpson: thank you for surprising me @radiodisney & @codysimpson 😊❤️ #RDMAs #RDMAafterparty #sweet16 http:\\/\\/t.co\\/r5apnHxHAK\",\"SCREEN_NAME\":\"NinaMustifasari\",\"USER_ID\":554329827,\"LANGUAGE\":\"en\",\"PROFILE_IMAGE_URL\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/460202598784843776\\/fVYnO_uN_normal.jpeg\",\"LOCATION\":\"Not Ireland but Pati,Indonesia\",\"USER_CREATED_AT\":\"2012-04-15 20:34:55\",\"COUNT\":15707,\"TIME_ZONE\":\"Jakarta\",\"UTC_OFFSET\":25200,\"FOLLOWERS\":974,\"FRIENDS\":415,\"FROM_USER_NAME\":\"Nina Mustifasari\",\"RETWEET_FLAG\":\"Y\",\"PROFILE_IMAGE_URL_HTTPS\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/460202598784843776\\/fVYnO_uN_normal.jpeg\",\"ORIGINAL_TWEET_ID\":461505066588192768,\"ORIGINAL_TWEET_LOCATION\":\"Gold Coast, Australia\"}\n" ] } ], "source": [ "%%sh\n", "\n", "# Let's preview the file first\n", "\n", "head -n 2 data/tweets.json" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- CAPTURED_AT: string (nullable = true)\n", " |-- CAPTURE_ID: long (nullable = true)\n", " |-- COUNT: long (nullable = true)\n", " |-- CREATED_AT: string (nullable = true)\n", " |-- FOLLOWERS: long (nullable = true)\n", " |-- FRIENDS: long (nullable = true)\n", " |-- FROM_USER_NAME: string (nullable = true)\n", " |-- ID: long (nullable = true)\n", " |-- IN_REPLY_TO_STATUS_ID: long (nullable = true)\n", " |-- LANGUAGE: string (nullable = true)\n", " |-- LATITUDE: double (nullable = true)\n", " |-- LOCATION: string (nullable = true)\n", " |-- LONGITUDE: double (nullable = true)\n", " |-- ORIGINAL_TWEET_ID: long (nullable = true)\n", " |-- ORIGINAL_TWEET_LOCATION: string (nullable = true)\n", " |-- PROFILE_IMAGE_URL: string (nullable = true)\n", " |-- PROFILE_IMAGE_URL_HTTPS: string (nullable = true)\n", " |-- RETWEET_FLAG: string (nullable = true)\n", " |-- SCREEN_NAME: string (nullable = true)\n", " |-- TEXT: string (nullable = true)\n", " |-- TIME_ZONE: string (nullable = true)\n", " |-- TO_USER: string (nullable = true)\n", " |-- TO_USER_ID: long (nullable = true)\n", " |-- USER_CREATED_AT: string (nullable = true)\n", " |-- USER_ID: long (nullable = true)\n", " |-- UTC_OFFSET: long (nullable = true)\n", "\n" ] } ], "source": [ "# load the DataFrame from a JSON file with `spark.read.json()`\n", "tweetsDF = spark.read.json('data/tweets.json')\n", "tweetsDF.printSchema()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>CAPTURED_AT</th>\n", " <th>CAPTURE_ID</th>\n", " <th>COUNT</th>\n", " <th>CREATED_AT</th>\n", " <th>FOLLOWERS</th>\n", " <th>FRIENDS</th>\n", " <th>FROM_USER_NAME</th>\n", " <th>ID</th>\n", " <th>IN_REPLY_TO_STATUS_ID</th>\n", " <th>LANGUAGE</th>\n", " <th>...</th>\n", " <th>PROFILE_IMAGE_URL_HTTPS</th>\n", " <th>RETWEET_FLAG</th>\n", " <th>SCREEN_NAME</th>\n", " <th>TEXT</th>\n", " <th>TIME_ZONE</th>\n", " <th>TO_USER</th>\n", " <th>TO_USER_ID</th>\n", " <th>USER_CREATED_AT</th>\n", " <th>USER_ID</th>\n", " <th>UTC_OFFSET</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>2014-05-01 00:00:35</td>\n", " <td>2</td>\n", " <td>60262</td>\n", " <td>2014-05-01 00:00:00</td>\n", " <td>298</td>\n", " <td>177</td>\n", " <td>MIDDLE FINGER☠</td>\n", " <td>461505248142438400</td>\n", " <td>None</td>\n", " <td>en</td>\n", " <td>...</td>\n", " <td>https://pbs.twimg.com/profile_images/456806985...</td>\n", " <td>Y</td>\n", " <td>ia_yeah</td>\n", " <td>RT @allisimpson: thank you for surprising me @...</td>\n", " <td>Hawaii</td>\n", " <td>None</td>\n", " <td>None</td>\n", " <td>2013-01-18 23:12:49</td>\n", " <td>1100797128</td>\n", " <td>-36000</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>2014-05-01 00:00:35</td>\n", " <td>2</td>\n", " <td>15707</td>\n", " <td>2014-05-01 00:00:00</td>\n", " <td>974</td>\n", " <td>415</td>\n", " <td>Nina Mustifasari</td>\n", " <td>461505250135142400</td>\n", " <td>None</td>\n", " <td>en</td>\n", " <td>...</td>\n", " <td>https://pbs.twimg.com/profile_images/460202598...</td>\n", " <td>Y</td>\n", " <td>NinaMustifasari</td>\n", " <td>RT @allisimpson: thank you for surprising me @...</td>\n", " <td>Jakarta</td>\n", " <td>None</td>\n", " <td>None</td>\n", " <td>2012-04-15 20:34:55</td>\n", " <td>554329827</td>\n", " <td>25200</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>2014-05-01 00:00:35</td>\n", " <td>2</td>\n", " <td>22131</td>\n", " <td>2014-05-01 00:00:01</td>\n", " <td>3115</td>\n", " <td>370</td>\n", " <td>Amber (◕‿◕✿)</td>\n", " <td>461505252017975296</td>\n", " <td>None</td>\n", " <td>en</td>\n", " <td>...</td>\n", " <td>https://pbs.twimg.com/profile_images/460048163...</td>\n", " <td>N</td>\n", " <td>ambershutup</td>\n", " <td>I also really was sour cream</td>\n", " <td>Brisbane</td>\n", " <td>None</td>\n", " <td>None</td>\n", " <td>2011-07-06 15:56:18</td>\n", " <td>330160256</td>\n", " <td>36000</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "<p>3 rows × 26 columns</p>\n", "</div>" ], "text/plain": [ "DataFrame[CAPTURED_AT: string, CAPTURE_ID: bigint, COUNT: bigint, CREATED_AT: string, FOLLOWERS: bigint, FRIENDS: bigint, FROM_USER_NAME: string, ID: bigint, IN_REPLY_TO_STATUS_ID: bigint, LANGUAGE: string, LATITUDE: double, LOCATION: string, LONGITUDE: double, ORIGINAL_TWEET_ID: bigint, ORIGINAL_TWEET_LOCATION: string, PROFILE_IMAGE_URL: string, PROFILE_IMAGE_URL_HTTPS: string, RETWEET_FLAG: string, SCREEN_NAME: string, TEXT: string, TIME_ZONE: string, TO_USER: string, TO_USER_ID: bigint, USER_CREATED_AT: string, USER_ID: bigint, UTC_OFFSET: bigint]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "display(tweetsDF.limit(3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similarly as with the `csv` files SparkSQL infers the schema (both the column names and the types) from the data.\n", "\n", "SparkSQL can also read data from `parquet` and `orc` files and extensions are available for additional formats (e.g. `avro`)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Writing CSV files\n", "\n", "We can save any DataFrame (original or tranformed) to a `csv` file:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>ORIGINAL_TWEET_LOCATION</th>\n", " <th>count</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>None</td>\n", " <td>3489</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>Gold Coast, Australia</td>\n", " <td>699</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>NC</td>\n", " <td>284</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>Brisbane, Australia</td>\n", " <td>131</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>Brisbane</td>\n", " <td>64</td>\n", " </tr>\n", " <tr>\n", " <th>5</th>\n", " <td>Nc</td>\n", " <td>34</td>\n", " </tr>\n", " <tr>\n", " <th>6</th>\n", " <td>Surfers Paradise</td>\n", " <td>21</td>\n", " </tr>\n", " <tr>\n", " <th>7</th>\n", " <td>Emerald City</td>\n", " <td>14</td>\n", " </tr>\n", " <tr>\n", " <th>8</th>\n", " <td>Townsville</td>\n", " <td>13</td>\n", " </tr>\n", " <tr>\n", " <th>9</th>\n", " <td>The City of Townsville</td>\n", " <td>12</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[ORIGINAL_TWEET_LOCATION: string, count: bigint]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.sql.functions import col, desc\n", "\n", "# create a derived DataFrame\n", "tweetsByLocationDF = tweetsDF.groupBy('ORIGINAL_TWEET_LOCATION').count().sort(desc('count')).limit(10)\n", "display(tweetsByLocationDF)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Save the derived DataFrame to a CSV file.\n", "# The `mode` parameter specifies the behavior of the write operation when the data already exists.\n", "# By default an exception is thrown, but setting it to 'overwrite' will overwrite the exising data.\n", "\n", "tweetsByLocationDF.write.csv('output/tweets_by_location.csv', mode='overwrite', header=True)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "total 8\n", "-rw-r--r-- 1 szu004 staff 0 10 Jul 20:36 _SUCCESS\n", "-rw-r--r-- 1 szu004 staff 191 10 Jul 20:36 part-00000-e2998e26-2859-4678-b3b1-14c221aee6d4.csv\n", "\n", "Content:\n", "ORIGINAL_TWEET_LOCATION,count\n", ",3489\n", "\"Gold Coast, Australia\",699\n", "NC,284\n", "\"Brisbane, Australia\",131\n" ] } ], "source": [ "%%sh\n", "\n", "# preview the output\n", "\n", "ls -l output/tweets_by_location.csv\n", "\n", "echo\n", "echo \"Content:\"\n", "\n", "head -n 5 output/tweets_by_location.csv/part*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similarly we can DataFrames in the `parquet` format, which is much more efficient both in terms of the data size and the performance of processing. Let's save the `NSW Air Temperature Data` in `parquet` format." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- Product_code: string (nullable = true)\n", " |-- Bureau_of_Meteorology_station_number: integer (nullable = true)\n", " |-- Year: integer (nullable = true)\n", " |-- Month: integer (nullable = true)\n", " |-- Day: integer (nullable = true)\n", " |-- Maximum_temperature__Degree_C_: double (nullable = true)\n", " |-- Days_of_accumulation_of_maximum_temperature: integer (nullable = true)\n", " |-- Quality: string (nullable = true)\n", "\n" ] } ], "source": [ "import re\n", "\n", "# we need to rename the colums first because the names of `parquet` columns cannot containt\n", "# spaces or other special characters.\n", "\n", "renamedDF = airTemperatureDF.select(*[col(c).alias(re.sub(r'[ ()]','_', c)) for c in airTemperatureDF.columns])\n", "renamedDF.printSchema()\n", "renamedDF.write.parquet('output/nsw_temp.parquet', mode='overwrite')" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "total 840\n", "-rw-r--r-- 1 szu004 staff 0B 10 Jul 20:37 _SUCCESS\n", "-rw-r--r-- 1 szu004 staff 133K 10 Jul 20:37 part-00000-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet\n", "-rw-r--r-- 1 szu004 staff 133K 10 Jul 20:37 part-00001-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet\n", "-rw-r--r-- 1 szu004 staff 135K 10 Jul 20:37 part-00002-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet\n", "-rw-r--r-- 1 szu004 staff 11K 10 Jul 20:37 part-00003-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet\n", "\n", "'cvs' data size:\n", "-rw-r--r-- 1 szu004 staff 13M 9 Jul 20:00 data/nsw_temp.csv\n" ] } ], "source": [ "%%sh\n", "\n", "# let's preview the results\n", "ls -lh output/nsw_temp.parquet\n", "\n", "echo\n", "\n", "# let's check the size of the `parquet` file\n", "echo \"'cvs' data size:\"\n", "\n", "ls -lh data/nsw_temp.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`parquet` file is compressed with `snappy` (or `gz`) and it's an order of magnitude smaller than the original `csv` file.\n", "\n", "For more information on writing DataFrames please check the documentation for [DataFrameWriter](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter)\n", "\n", "You can now play around modifying pieces of the code.\n", "\n", "When you are done and you are running off the local machine remember to *close the notebook* with `File/Close and Halt`." ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 1 }