{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## This notebook is part of Hadoop and Spark training delivered by CERN IT\n",
    "### Spark Structured Streaming Hands-On Lab\n",
    "Contact: Luca.Canali@cern.ch"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run this notebook from Jupyter with Python kernel\n",
    "- When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container\n",
    "- If running this outside CERN SWAN, please make sure to have PySpark installed: `pip install pyspark`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create Spark Session, you need this to work with Spark\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession.builder \\\n",
    "        .appName(\"my streaming test app\")  \\\n",
    "        .master(\"local[*]\") \\\n",
    "        .config(\"spark.driver.memory\",\"2g\") \\\n",
    "        .config(\"spark.ui.showConsoleProgress\", \"false\") \\\n",
    "        .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# sets the path to the directory with datafiles\n",
    "PATH = \"../data/streaming/\"\n",
    "\n",
    "schema = \"timestamp int, name string, value double\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define the input part of the streaming pipeline\n",
    "# This reads all the .csv files in a given directory\n",
    "# It checks continuosly for arrival of new files\n",
    "\n",
    "input_path = PATH + \"*.csv\"\n",
    "input_stream = (spark.readStream.format(\"csv\")\n",
    "               .option(\"header\",\"true\")\n",
    "               .schema(schema)\n",
    "               .option(\"path\", input_path)\n",
    "               .load())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Defines an output stream of the pipeline, this writes data to a view in memory\n",
    "# Use for testing, in a real case you would write to files and/or Kafka\n",
    "#\n",
    "# Delete the checkpoint dir if it already exists\n",
    "# ! rm -r myStreamingCheckPoint1\n",
    "\n",
    "raw_stream = (input_stream.writeStream \n",
    "             .queryName(\"data_read\")\n",
    "             .outputMode(\"append\")\n",
    "             .format(\"memory\")\n",
    "             .option(\"checkpointLocation\", \"myStreamingCheckPoint1\") \n",
    "             .start())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.table(\"data_read\").printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Query the output table\n",
    "# Run this multiple times, as you add csv files with data in the input_path directory\n",
    "\n",
    "spark.sql(\"select * from data_read\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "raw_stream.status"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This maps the input_stream to a temporary view, so that we can work with it using SQL\n",
    "input_stream.createOrReplaceTempView(\"input_stream\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use Spark SQL to describe the aggregation and tranformation on streaming data\n",
    "df = spark.sql(\"\"\"\n",
    "select name||'_aggregated' as name_aggregated, count(*) as n_points, sum(value) sum_values \n",
    "from input_stream \n",
    "group by name\"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Defines another output stream for the pipeline\n",
    "aggregated_stream = (df.writeStream\n",
    "                    .queryName(\"data_aggregated\")\n",
    "                    .outputMode(\"complete\")\n",
    "                    .format(\"memory\")\n",
    "                    .option(\"checkpointLocation\", \"myStreamingCheckPoint2\") \n",
    "                    .start())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Query the table with aggregated data, this is updated as new data arrives in the input pipeline\n",
    "spark.sql(\"select * from data_aggregated\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "----"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Simulate the arrival of streaming data\n",
    "Add more data in form of .csv files to the input_path folder\n",
    "and run the queries of the output streams again"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Query the output table\n",
    "# Run this multiple times, as you add csv files with data in the input_path directory\n",
    "\n",
    "spark.sql(\"select * from data_read\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Query the table with aggregated data\n",
    "# this is updated as new data arrives in the input pipeline\n",
    "\n",
    "spark.sql(\"select * from data_aggregated\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# stop everything\n",
    "raw_stream.stop()\n",
    "aggregated_stream.stop()\n",
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}