{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## This notebook is part of Hadoop tutorials delivered by IT-DB group\n", "### SPARK DataFrame Hands-On Lab" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Hands-On 1 - Construct a DataFrame from parquet file\n", "*This demostrates how to read a parquet file and construct a DataFrame*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### First lets create sparkcontext (sc) and SQLContext (sqlContext)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "scrolled": true }, "outputs": [], "source": [ "from pyspark import SparkContext, SQLContext, SparkConf" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "conf = SparkConf().setMaster(\"local\").set(\"spark.driver.memory\", \"1g\").set(\"spark.executor.memory\", \"1g\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "sc = SparkContext(conf = conf)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "scrolled": true }, "outputs": [], "source": [ "sqlContext = SQLContext(sc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Read the parquet file into DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "df = sqlContext.read.parquet('hadoop-tutorials-data/UN_Pop_Stats.parquet')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Inspect the data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "df.show(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Print the schema of the DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Hands-On 2 - Calculate the year wise population of switzerland\n", "*This shows how to query dataframes and how to show explain plan*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### first, lets import the bits we need" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "scrolled": true }, "outputs": [], "source": [ "import pyspark.sql.functions as func" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### year wise population for all countries\n", "*you can see how you can filter, groupBy, aggregate and sort the dataframe*" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "y_df = df.filter(df.Sex == '\"Both\"') \\\n", " .groupBy(df.Location,df.Time) \\\n", " .agg(func.sum(df.Value*1000) \\\n", " .alias(\"Sum\")) \\\n", " .orderBy(df.Time)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### filter out for switzerland (or for that matter your country of choice)\n", "*you can see how select can be used on dataframes to select the columns you need*" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "c_df = y_df.filter(df.Location == '\"Switzerland\"') \\\n", " .select(df.Time,\"Sum\") \\\n", " .collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "print(c_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### plot the results using matlibplot" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%matplotlib notebook\n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "plt.figure(figsize=(14,6))\n", "x_val = [x[0][1] for x in c_df]\n", "y_val = [x[1] for x in c_df]\n", "plt.plot(range(len(y_val)), y_val)\n", "plt.xticks(range(len(x_val)), x_val, size='small')\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### finally you can view the explain plan generated by catalyst optimizer" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "y_df.explain()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Hands-On 3 - Construct the dataframes using json\n", "*This demonstrates how json can be manipulated using dataframes*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Read a json into a dataframe" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "scrolled": true }, "outputs": [], "source": [ "df_json = sqlContext.read.json('hadoop-tutorials-data/meetup-final.json')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### dataframe can infer schema from json file" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "df_json.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Top events by rsvp's" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "df_json.groupBy(df_json.event.event_name,df_json.group.group_city,df_json.venue.venue_name) \\\n", " .count() \\\n", " .select(\"event[event_name]\",\"group[group_city]\",\"venue[venue_name]\",\"count\") \\\n", " .orderBy(\"count\", ascending = False) \\\n", " .show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### what if the json contains an array, no problem we can explode it" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "scrolled": true }, "outputs": [], "source": [ "from pyspark.sql.functions import explode" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [], "source": [ "df_json.select(df_json.event.event_name,explode(\"group.group_topics\")).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Hands-On 4 - This demostrates how DataFrame can be persisted as table and issue queries against it" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Load the whitehouse vistor records parquet file into DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "wh_df = sqlContext.read.parquet(\"hadoop-tutorials-data/WH_VR.parquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Print the schema to understand the layout" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "wh_df.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "wh_df.columns" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Inspect the data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "wh_df.select(\"NAMELAST\",\"APPT_START_DATE\").show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### persist the DataFrame as temporary table" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "wh_df.registerTempTable(\"Vistor_Records\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### You can now use issue the queries against this table using sqlContext.sql interface\n", "*Count the vistors by last name*" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "count_by_name = sqlContext.sql(\"select NAMELAST, count(1) as count from Vistor_Records group by NAMELAST order by count desc\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "count_by_name.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*count the number of vistors by year*" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "count_by_day = sqlContext.sql(\"select year(APPT_START_DATE), count(1) as count from Vistor_Records \\\n", " where APPT_START_DATE is not null \\\n", " group by year(APPT_START_DATE) \\\n", " order by year(APPT_START_DATE)\").collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "plt.figure(figsize=(14,6))\n", "x_val = [x[0] for x in sorted(count_by_day)]\n", "y_val = [x[1] for x in sorted(count_by_day)]\n", "plt.bar(range(len(y_val)), y_val)\n", "plt.xticks(range(len(x_val)), x_val, size='small')\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Count the number of vistors by weekday\n", "*This demonstrates how you can create UDF - user defined function*" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from datetime import datetime\n", "from pyspark.sql.types import StringType\n", "def get_weekday(str):\n", " return str.strftime(\"%A\")\n", "sqlContext.registerFunction(\"get_weekday\", lambda x: \\\n", " get_weekday(x), \\\n", " StringType())" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "count_by_wday = sqlContext.sql(\"select get_weekday(APPT_START_DATE), count(1) as count from Vistor_Records \\\n", " where APPT_START_DATE is not null \\\n", " group by get_weekday(APPT_START_DATE) \\\n", " order by count desc\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "count_by_wday.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Finally lets produce histogram on the group size" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "group_size = sqlContext.sql(\"select distinct UIN, Total_People from Vistor_Records \\\n", " where Total_People > 30 \\\n", " and Total_People < 200\").collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import numpy as np\n", "plt.figure(figsize=(14,6))\n", "x_val = [int(x[1]) for x in group_size]\n", "print min(x_val),max(x_val)\n", "bins = np.arange(min(x_val), max(x_val), 10) \n", "n, bins, patches = plt.hist(x_val, bins=bins, facecolor='green')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.10" } }, "nbformat": 4, "nbformat_minor": 0 }