{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## This notebook is part of Hadoop and Spark training delivered by CERN IT\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### SPARK DataFrame Hands-On Lab\n",
    "Contact: Luca.Canali@cern.ch\n",
    "\n",
    "### Objective: Perform Basic DataFrame Operations\n",
    "1. Creating DataFrames\n",
    "2. Select columns\n",
    "3. Add, rename and drop columns\n",
    "4. Filtering rows\n",
    "5. Aggregations"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Exercises and solutions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Reminder: documentation at \n",
    "https://spark.apache.org/docs/latest/api/python/index.html"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create the SparkSession\n",
    "# and read the dataset\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession.builder \\\n",
    "        .master(\"local[*]\") \\\n",
    "        .appName(\"DataFrame HandsOn 1\") \\\n",
    "        .config(\"spark.ui.showConsoleProgress\",\"false\") \\\n",
    "        .getOrCreate()\n",
    "\n",
    "online_retail_schema=\"InvoiceNo int, StockCode string, Description string, Quantity int,\\\n",
    "InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string\"\n",
    "\n",
    "df = spark.read \\\n",
    "        .option(\"header\", \"true\") \\\n",
    "        .option(\"timestampFormat\", \"M/d/yyyy H:m\")\\\n",
    "        .csv(\"../data/online-retail-dataset.csv.gz\",\n",
    "             schema=online_retail_schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "\n",
       "            <div>\n",
       "                <p><b>SparkSession - in-memory</b></p>\n",
       "                \n",
       "        <div>\n",
       "            <p><b>SparkContext</b></p>\n",
       "\n",
       "            <p><a href=\"http://jupyter-canali:4042\">Spark UI</a></p>\n",
       "\n",
       "            <dl>\n",
       "              <dt>Version</dt>\n",
       "                <dd><code>v3.3.1</code></dd>\n",
       "              <dt>Master</dt>\n",
       "                <dd><code>local[*]</code></dd>\n",
       "              <dt>AppName</dt>\n",
       "                <dd><code>DataFrame HandsOn 1</code></dd>\n",
       "            </dl>\n",
       "        </div>\n",
       "        \n",
       "            </div>\n",
       "        "
      ],
      "text/plain": [
       "<pyspark.sql.session.SparkSession at 0x7fde643e5490>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>InvoiceNo</th>\n",
       "      <th>StockCode</th>\n",
       "      <th>Description</th>\n",
       "      <th>Quantity</th>\n",
       "      <th>InvoiceDate</th>\n",
       "      <th>UnitPrice</th>\n",
       "      <th>CustomerId</th>\n",
       "      <th>Country</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>536365</td>\n",
       "      <td>85123A</td>\n",
       "      <td>WHITE HANGING HEART T-LIGHT HOLDER</td>\n",
       "      <td>6</td>\n",
       "      <td>2010-12-01 08:26:00</td>\n",
       "      <td>2.55</td>\n",
       "      <td>17850</td>\n",
       "      <td>United Kingdom</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>536365</td>\n",
       "      <td>71053</td>\n",
       "      <td>WHITE METAL LANTERN</td>\n",
       "      <td>6</td>\n",
       "      <td>2010-12-01 08:26:00</td>\n",
       "      <td>3.39</td>\n",
       "      <td>17850</td>\n",
       "      <td>United Kingdom</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>536365</td>\n",
       "      <td>84406B</td>\n",
       "      <td>CREAM CUPID HEARTS COAT HANGER</td>\n",
       "      <td>8</td>\n",
       "      <td>2010-12-01 08:26:00</td>\n",
       "      <td>2.75</td>\n",
       "      <td>17850</td>\n",
       "      <td>United Kingdom</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>536365</td>\n",
       "      <td>84029G</td>\n",
       "      <td>KNITTED UNION FLAG HOT WATER BOTTLE</td>\n",
       "      <td>6</td>\n",
       "      <td>2010-12-01 08:26:00</td>\n",
       "      <td>3.39</td>\n",
       "      <td>17850</td>\n",
       "      <td>United Kingdom</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>536365</td>\n",
       "      <td>84029E</td>\n",
       "      <td>RED WOOLLY HOTTIE WHITE HEART.</td>\n",
       "      <td>6</td>\n",
       "      <td>2010-12-01 08:26:00</td>\n",
       "      <td>3.39</td>\n",
       "      <td>17850</td>\n",
       "      <td>United Kingdom</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   InvoiceNo StockCode                          Description  Quantity  \\\n",
       "0     536365    85123A   WHITE HANGING HEART T-LIGHT HOLDER         6   \n",
       "1     536365     71053                  WHITE METAL LANTERN         6   \n",
       "2     536365    84406B       CREAM CUPID HEARTS COAT HANGER         8   \n",
       "3     536365    84029G  KNITTED UNION FLAG HOT WATER BOTTLE         6   \n",
       "4     536365    84029E       RED WOOLLY HOTTIE WHITE HEART.         6   \n",
       "\n",
       "          InvoiceDate  UnitPrice  CustomerId         Country  \n",
       "0 2010-12-01 08:26:00       2.55       17850  United Kingdom  \n",
       "1 2010-12-01 08:26:00       3.39       17850  United Kingdom  \n",
       "2 2010-12-01 08:26:00       2.75       17850  United Kingdom  \n",
       "3 2010-12-01 08:26:00       3.39       17850  United Kingdom  \n",
       "4 2010-12-01 08:26:00       3.39       17850  United Kingdom  "
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.limit(5).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "Task: Show 5 lines of the \"description\" column "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------------------------+\n",
      "|description                        |\n",
      "+-----------------------------------+\n",
      "|WHITE HANGING HEART T-LIGHT HOLDER |\n",
      "|WHITE METAL LANTERN                |\n",
      "|CREAM CUPID HEARTS COAT HANGER     |\n",
      "|KNITTED UNION FLAG HOT WATER BOTTLE|\n",
      "|RED WOOLLY HOTTIE WHITE HEART.     |\n",
      "+-----------------------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(\"description\").show(5,truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "Task: Count the number of distinct invoices in the dataframe"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------------+\n",
      "|count(DISTINCT InvoiceNo)|\n",
      "+-------------------------+\n",
      "|                    22061|\n",
      "+-------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import countDistinct\n",
    "\n",
    "df.select(countDistinct(\"InvoiceNo\")).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "Task: Find out in which month most invoices have been processed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+-----+\n",
      "|month(InvoiceDate)|count|\n",
      "+------------------+-----+\n",
      "|                 2|27707|\n",
      "|                 4|29916|\n",
      "|                 1|35147|\n",
      "|                 8|35284|\n",
      "|                 3|36748|\n",
      "|                 6|36874|\n",
      "|                 5|37030|\n",
      "|                 7|39518|\n",
      "|                 9|50226|\n",
      "|                10|60742|\n",
      "|                12|68006|\n",
      "|                11|84711|\n",
      "+------------------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# This shows how many line items have been processed per month\n",
    "\n",
    "from pyspark.sql.functions import month\n",
    "df.groupby(month(\"InvoiceDate\")).count().sort(\"count\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+----------------+\n",
      "|month(InvoiceDate)|DistinctInvoices|\n",
      "+------------------+----------------+\n",
      "|                11|            3021|\n",
      "|                12|            2568|\n",
      "|                10|            2275|\n",
      "|                 9|            1994|\n",
      "|                 5|            1848|\n",
      "|                 6|            1683|\n",
      "|                 3|            1665|\n",
      "|                 7|            1657|\n",
      "|                 4|            1504|\n",
      "|                 8|            1456|\n",
      "|                 1|            1216|\n",
      "|                 2|            1174|\n",
      "+------------------+----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# This shows how distinct invoices have been processed per month\n",
    "\n",
    "from pyspark.sql.functions import col, month, countDistinct\n",
    "\n",
    "(df\n",
    "   .groupBy(month('InvoiceDate'))\n",
    "   .agg(countDistinct('InvoiceNo').alias('DistinctInvoices'))\n",
    "   .orderBy(col('DistinctInvoices').desc())\n",
    "   .show()\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Task: Filter the lines where the Quantity is more than 30"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n",
      "|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|\n",
      "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n",
      "|   536367|    84879|ASSORTED COLOUR B...|      32|2010-12-01 08:34:00|     1.69|     13047|United Kingdom|\n",
      "|   536370|    10002|INFLATABLE POLITI...|      48|2010-12-01 08:45:00|     0.85|     12583|        France|\n",
      "|   536370|    22492|MINI PAINT SET VI...|      36|2010-12-01 08:45:00|     0.65|     12583|        France|\n",
      "|   536371|    22086|PAPER CHAIN KIT 5...|      80|2010-12-01 09:00:00|     2.55|     13748|United Kingdom|\n",
      "|   536374|    21258|VICTORIAN SEWING ...|      32|2010-12-01 09:09:00|    10.95|     15100|United Kingdom|\n",
      "|   536376|    22114|HOT WATER BOTTLE ...|      48|2010-12-01 09:32:00|     3.45|     15291|United Kingdom|\n",
      "|   536376|    21733|RED HANGING HEART...|      64|2010-12-01 09:32:00|     2.55|     15291|United Kingdom|\n",
      "|   536378|    21212|PACK OF 72 RETROS...|     120|2010-12-01 09:37:00|     0.42|     14688|United Kingdom|\n",
      "|   536378|   85183B|CHARLIE & LOLA WA...|      48|2010-12-01 09:37:00|     1.25|     14688|United Kingdom|\n",
      "|   536378|   85071B|RED CHARLIE+LOLA ...|      96|2010-12-01 09:37:00|     0.38|     14688|United Kingdom|\n",
      "|   536381|    22719|GUMBALL MONOCHROM...|      36|2010-12-01 09:41:00|     1.06|     15311|United Kingdom|\n",
      "|   536382|    22381|TOY TIDY PINK POL...|      50|2010-12-01 09:45:00|     1.85|     16098|United Kingdom|\n",
      "|   536384|    84755|COLOUR GLASS T-LI...|      48|2010-12-01 09:53:00|     0.65|     18074|United Kingdom|\n",
      "|   536384|    22469|HEART OF WICKER S...|      40|2010-12-01 09:53:00|     1.45|     18074|United Kingdom|\n",
      "|   536384|    22470|HEART OF WICKER L...|      40|2010-12-01 09:53:00|     2.55|     18074|United Kingdom|\n",
      "|   536386|    84880|WHITE WIRE EGG HO...|      36|2010-12-01 09:57:00|     4.95|     16029|United Kingdom|\n",
      "|   536386|   85099C|JUMBO  BAG BAROQU...|     100|2010-12-01 09:57:00|     1.65|     16029|United Kingdom|\n",
      "|   536386|   85099B|JUMBO BAG RED RET...|     100|2010-12-01 09:57:00|     1.65|     16029|United Kingdom|\n",
      "|   536387|    79321|       CHILLI LIGHTS|     192|2010-12-01 09:58:00|     3.82|     16029|United Kingdom|\n",
      "|   536387|    22780|LIGHT GARLAND BUT...|     192|2010-12-01 09:58:00|     3.37|     16029|United Kingdom|\n",
      "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.where(\"Quantity > 30\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "Task: Show the four most sold items (by quantity)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-------------+\n",
      "|         Description|totalQuantity|\n",
      "+--------------------+-------------+\n",
      "|WORLD WAR 2 GLIDE...|        53847|\n",
      "|JUMBO BAG RED RET...|        47363|\n",
      "|ASSORTED COLOUR B...|        36381|\n",
      "|      POPCORN HOLDER|        36334|\n",
      "+--------------------+-------------+\n",
      "only showing top 4 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import desc, asc, expr\n",
    "(df.groupBy(\"Description\")\n",
    "     .agg(expr(\"sum(Quantity) as totalQuantity\"))\n",
    "     .sort(\"totalQuantity\", ascending=False)\n",
    "     .show(4))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "Bonus question: why do these two operations return different results? Hint: look at the documentation "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "22062\n",
      "+-------------------------+\n",
      "|count(DISTINCT InvoiceNo)|\n",
      "+-------------------------+\n",
      "|                    22061|\n",
      "+-------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "print(df.select(\"InvoiceNo\").distinct().count())\n",
    "from pyspark.sql.functions import countDistinct\n",
    "df.select(countDistinct(\"InvoiceNo\")).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As you can see from the output of `countDistinct`, internally it runs `count(DISTINCT`, which excludes `null`s.\n",
    "\n",
    "https://spark.apache.org/docs/latest/api/sql/#count\n",
    "\n",
    "* `count()` Returns the total number of retrieved rows, including rows containing null\n",
    "\n",
    "* `count(DISTINCT expr[, expr...])` - Returns the number of rows for which the supplied expression(s) are unique and non-null."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "celltoolbar": "Slideshow",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.12"
  },
  "sparkconnect": {
   "bundled_options": [],
   "list_of_options": []
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}