{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## This notebook is part of Hadoop and Spark training delivered by CERN IT\n", "### Spark Structured Streaming Hands-On Lab\n", "Contact: Luca.Canali@cern.ch" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run this notebook from Jupyter with Python kernel\n", "- When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container\n", "- If running this outside CERN SWAN, please make sure to have PySpark installed: `pip install pyspark`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Spark Session, you need this to work with Spark\n", "\n", "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder \\\n", " .appName(\"my streaming test app\") \\\n", " .master(\"local[*]\") \\\n", " .config(\"spark.driver.memory\",\"2g\") \\\n", " .config(\"spark.ui.showConsoleProgress\", \"false\") \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# sets the path to the directory with datafiles\n", "PATH = \"../data/streaming/\"\n", "\n", "schema = \"timestamp int, name string, value double\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define the input part of the streaming pipeline\n", "# This reads all the .csv files in a given directory\n", "# It checks continuosly for arrival of new files\n", "\n", "input_path = PATH + \"*.csv\"\n", "input_stream = (spark.readStream.format(\"csv\")\n", " .option(\"header\",\"true\")\n", " .schema(schema)\n", " .option(\"path\", input_path)\n", " .load())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Defines an output stream of the pipeline, this writes data to a view in memory\n", "# Use for testing, in a real case you would write to files and/or Kafka\n", "#\n", "# Delete the checkpoint dir if it already exists\n", "# ! rm -r myStreamingCheckPoint1\n", "\n", "raw_stream = (input_stream.writeStream \n", " .queryName(\"data_read\")\n", " .outputMode(\"append\")\n", " .format(\"memory\")\n", " .option(\"checkpointLocation\", \"myStreamingCheckPoint1\") \n", " .start())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.table(\"data_read\").printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query the output table\n", "# Run this multiple times, as you add csv files with data in the input_path directory\n", "\n", "spark.sql(\"select * from data_read\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "raw_stream.status" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This maps the input_stream to a temporary view, so that we can work with it using SQL\n", "input_stream.createOrReplaceTempView(\"input_stream\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Use Spark SQL to describe the aggregation and tranformation on streaming data\n", "df = spark.sql(\"\"\"\n", "select name||'_aggregated' as name_aggregated, count(*) as n_points, sum(value) sum_values \n", "from input_stream \n", "group by name\"\"\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Defines another output stream for the pipeline\n", "aggregated_stream = (df.writeStream\n", " .queryName(\"data_aggregated\")\n", " .outputMode(\"complete\")\n", " .format(\"memory\")\n", " .option(\"checkpointLocation\", \"myStreamingCheckPoint2\") \n", " .start())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query the table with aggregated data, this is updated as new data arrives in the input pipeline\n", "spark.sql(\"select * from data_aggregated\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Simulate the arrival of streaming data\n", "Add more data in form of .csv files to the input_path folder\n", "and run the queries of the output streams again" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query the output table\n", "# Run this multiple times, as you add csv files with data in the input_path directory\n", "\n", "spark.sql(\"select * from data_read\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query the table with aggregated data\n", "# this is updated as new data arrives in the input pipeline\n", "\n", "spark.sql(\"select * from data_aggregated\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# stop everything\n", "raw_stream.stop()\n", "aggregated_stream.stop()\n", "spark.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" } }, "nbformat": 4, "nbformat_minor": 2 }