{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Getting started with Spark: Spark SQL in Python" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This tutorial is based on [Spark SQL Guide - Getting started](https://spark.apache.org/docs/latest/sql-getting-started.html).\n", "\n", "For this demo we used the city of Vienna trees dataset (\"Baumkataster\") made available by [Open Data Österreich](https://www.data.gv.at) and downloadable from [here](https://www.data.gv.at/katalog/dataset/c91a4635-8b7d-43fe-9b27-d95dec8392a7) .\n", "\n", "# Table of contents\n", "1. [Spark session](#sparkSession)\n", "2. [Spark configuration](#sparkConfiguration)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Spark session \n", "\n", "We're going to start by creating a Spark _session_. Our Spark job will be named \"Python Spark SQL basic example\". `spark` is the variable holding our Spark session." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Python Spark SQL basic example\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read the file into a Spark [_dataframe_](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "df = spark.read \\\n", " .load(\"FME_BaumdatenBearbeitet_OGD_20190205.csv\",\n", " format=\"csv\", sep=\";\", header=\"true\", encoding=\"iso-8859-1\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note:** we assume that the file `FME_BaumdatenBearbeitet_OGD_20190205.csv` is in your local directory. If at this point you get an error message that looks like `AnalysisException: 'Path does not exist` then check your [Spark configuration](#sparkConfig) for how to define the correct file path." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Show first three lines of Spark dataframe" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+\n", "|Flaeche|BaumNr| Gattung| Art|Sorte|NameDeutsch|Hoehe|Schirmdurchmesser|Stammumfang|Typ| XPos| YPos| lon| lat|\n", "+-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+\n", "| 0| 0| ZumTesten| 0| 0| 20190205| 0| 0| 0| 0|70000|350000|14,2757549011314|48,2844031941042|\n", "| 870| 021a| Quercus| sp.| -| Eiche| 6| 3| 31| L|72431|354949|14,3093549528352|48,3286271802142|\n", "| 572| 127|Liriodendron|tulipifera| -| Tulpenbaum| 5| 2| 21| L|71171|353742|14,2921648325343|48,3179178510249|\n", "+-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+\n", "only showing top 3 rows\n", "\n" ] } ], "source": [ "df.show(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For pretty-printing you can use `toPandas()`" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
FlaecheBaumNrGattungArtSorteNameDeutschHoeheSchirmdurchmesserStammumfangTypXPosYPoslonlat
000ZumTesten002019020500007000035000014,275754901131448,2844031941042
1870021aQuercussp.-Eiche6331L7243135494914,309354952835248,3286271802142
2572127Liriodendrontulipifera-Tulpenbaum5221L7117135374214,292164832534348,3179178510249
\n", "
" ], "text/plain": [ " Flaeche BaumNr Gattung Art Sorte NameDeutsch Hoehe \\\n", "0 0 0 ZumTesten 0 0 20190205 0 \n", "1 870 021a Quercus sp. - Eiche 6 \n", "2 572 127 Liriodendron tulipifera - Tulpenbaum 5 \n", "\n", " Schirmdurchmesser Stammumfang Typ XPos YPos lon \\\n", "0 0 0 0 70000 350000 14,2757549011314 \n", "1 3 31 L 72431 354949 14,3093549528352 \n", "2 2 21 L 71171 353742 14,2921648325343 \n", "\n", " lat \n", "0 48,2844031941042 \n", "1 48,3286271802142 \n", "2 48,3179178510249 " ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.toPandas().head(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Show number of different trees (count German names in `df` and sort by count)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-----+\n", "| NameDeutsch|count|\n", "+--------------------+-----+\n", "| Winter-Linde| 1583|\n", "| Weiß-Birke| 1442|\n", "| Spitz-Ahorn| 1273|\n", "| Stiel-Eiche| 1228|\n", "| Ahorn| 1079|\n", "| Hainbuche| 1036|\n", "| Gemeine Esche| 987|\n", "|Ahornblättrige-Pl...| 961|\n", "| Rotbuche| 747|\n", "| Linde| 688|\n", "| Feld-Ahorn| 637|\n", "| Berg-Ahorn| 566|\n", "| Säulen-Hainbuche| 507|\n", "|Gemeine Rosskastanie| 461|\n", "| Tulpenbaum| 436|\n", "| Robinie| 358|\n", "| Silber-Weide| 353|\n", "| Schwarz-Kiefer| 321|\n", "| Baumhasel| 258|\n", "| Serbische Fichte| 257|\n", "+--------------------+-----+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "df.groupBy(\"NameDeutsch\").count().orderBy('count', ascending=False).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "An example of SQL query (see [Running SQL Queries Programmatically](https://spark.apache.org/docs/latest/sql-getting-started.html#running-sql-queries-programmatically)): let's sort trees by height (\"Hoehe\")." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "df.createOrReplaceTempView(\"baeume\")" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+--------------------+-----+----------------+----------------+\n", "|BaumNr| NameDeutsch|Hoehe| lat| lon|\n", "+------+--------------------+-----+----------------+----------------+\n", "| 844| Weißdorn| 99|48,3198141692826|14,3032456456049|\n", "| 005| Hainbuche| 99|48,3107341721266|14,2818381176027|\n", "| 129| Weiß-Birke| 99|48,2764221420506|14,3029622419918|\n", "| 037|Gemeine Kiefer / ...| 95|48,3021818905511|14,2769106844535|\n", "| 007| Tulpenbaum| 90|48,2765758955147|14,3027882173276|\n", "| 051| Weiß-Birke| 9|48,2671468323820|14,2832242240464|\n", "| 001| Säulen-Hainbuche| 9|48,2973425597313|14,3138560746343|\n", "| 009| Sommer-Linde| 9|48,2383321988402|14,3668869798697|\n", "| 025| Schwarz-Birke| 9|48,2749605071833|14,2781176169409|\n", "| 004| Rotbuche| 9|48,2768058741599|14,2821412254345|\n", "| 011| Robinie| 9|48,2382302739335|14,3347263036350|\n", "| 009| Götterbaum| 9|48,2766683843868|14,2814265266950|\n", "| 014| Götterbaum| 9|48,2764131088558|14,2809709854517|\n", "| 004| Rotbuche| 9|48,2768218589229|14,2821505374108|\n", "| 002| Winter-Linde| 9|48,2960178194628|14,2818038617467|\n", "| 001| Silber-Ahorn| 9|48,2777290002478|14,2797511856080|\n", "| 036| Ahorn| 9|48,2335935143941|14,3798873585795|\n", "| 047| Sommer-Linde| 9|48,2346725447652|14,3758058184719|\n", "| 056| Hainbuche| 9|48,2784551013593|14,3103718222569|\n", "| 022| Weiß-Birke| 9|48,2781732375800|14,2972684731463|\n", "+------+--------------------+-----+----------------+----------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "spark.sql(\"SELECT BaumNr, NameDeutsch, Hoehe, lat, lon FROM baeume order by Hoehe desc\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The height data doesn't seem to be up-to-date." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Spark configuration \n", "\n", "Spark properties control most application settings and are configured separately for each application. These properties can be set directly on a `SparkConf` passed to your `SparkContext` (from [Apache Spark documentation](https://spark.apache.org/docs/latest/configuration.html#spark-properties)). \n", "\n", "We've already seen how to modify the `SparkConf` when we created our Spark application session with the command:\n", "
\n",
    "    spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"Python Spark SQL basic example\") \\\n",
    "    .getOrCreate()\n",
    "
\n", "\n", "Let us look at the rest of the Spark configuration." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('spark.app.id', 'local-1574021194317'),\n", " ('spark.driver.port', '34544'),\n", " ('spark.rdd.compress', 'True'),\n", " ('spark.serializer.objectStreamReset', '100'),\n", " ('spark.master', 'local[*]'),\n", " ('spark.executor.id', 'driver'),\n", " ('spark.app.name', 'Python Spark SQL basic example'),\n", " ('spark.submit.deployMode', 'client'),\n", " ('spark.driver.host', 'c100.local'),\n", " ('spark.ui.showConsoleProgress', 'true')]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.conf import SparkConf\n", "spark.sparkContext._conf.getAll()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The property `spark.app.name` is the name of our app that we just defined.\n", "\n", "Another important property is `spark.master`. This defines the _master URL_ for the Spark application. A list of all admissible values for `spark.master` is given here: [master-urls](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls).\n", "\n", "In this example the Spark master URL is `local[*]`, this means that our Spark application will run locally with as many worker threads as logical cores on our local machine.\n", "\n", "If you have a Hadoop cluster available you can deploy your Spark application on Yarn by setting the option `spark.master = yarn`. Let's do that and then check the Spark configuration once again." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('spark.master', 'yarn'),\n", " ('spark.app.id', 'local-1574021194317'),\n", " ('spark.driver.port', '34544'),\n", " ('spark.rdd.compress', 'True'),\n", " ('spark.serializer.objectStreamReset', '100'),\n", " ('spark.executor.id', 'driver'),\n", " ('spark.app.name', 'Python Spark SQL basic example'),\n", " ('spark.submit.deployMode', 'client'),\n", " ('spark.driver.host', 'c100.local'),\n", " ('spark.ui.showConsoleProgress', 'true')]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Python Spark SQL basic example\") \\\n", " .master('yarn') \\\n", " .getOrCreate()\n", "\n", "spark.sparkContext._conf.getAll()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With this configuration our Spark application will run on the Hadoop cluster and its resources will be managed by Yarn.\n", "\n", "**Note:** If the Hadoop cluster is configured with HDFS as its default filesystem, then you need to upload your CSV file to Hadoop in order to be able to read it:\n", "\n", " hdfs dfs -put FME_BaumdatenBearbeitet_OGD_20190205.csv FME_BaumdatenBearbeitet_OGD_20190205.csv\n", "\n", "and then you can just use `.load( ...) ` again." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rw-r--r-- 3 datalab supergroup 2623070 2019-11-17 20:53 FME_BaumdatenBearbeitet_OGD_20190205.csv\n" ] } ], "source": [ "%%bash\n", "hdfs dfs -put FME_BaumdatenBearbeitet_OGD_20190205.csv \n", "hdfs dfs -ls FME_BaumdatenBearbeitet_OGD_20190205.csv" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "df = spark.read \\\n", " .load(\"FME_BaumdatenBearbeitet_OGD_20190205.csv\",\n", " format=\"csv\", sep=\";\", header=\"true\", encoding=\"iso-8859-1\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's now re-run the previous commands. This time the application is going to be deployed on the cluster." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+--------------------+-----+----------------+----------------+\n", "|BaumNr| NameDeutsch|Hoehe| lat| lon|\n", "+------+--------------------+-----+----------------+----------------+\n", "| 844| Weißdorn| 99|48,3198141692826|14,3032456456049|\n", "| 005| Hainbuche| 99|48,3107341721266|14,2818381176027|\n", "| 129| Weiß-Birke| 99|48,2764221420506|14,3029622419918|\n", "| 037|Gemeine Kiefer / ...| 95|48,3021818905511|14,2769106844535|\n", "| 007| Tulpenbaum| 90|48,2765758955147|14,3027882173276|\n", "| 051| Weiß-Birke| 9|48,2671468323820|14,2832242240464|\n", "| 001| Säulen-Hainbuche| 9|48,2973425597313|14,3138560746343|\n", "| 009| Sommer-Linde| 9|48,2383321988402|14,3668869798697|\n", "| 025| Schwarz-Birke| 9|48,2749605071833|14,2781176169409|\n", "| 004| Rotbuche| 9|48,2768058741599|14,2821412254345|\n", "| 011| Robinie| 9|48,2382302739335|14,3347263036350|\n", "| 009| Götterbaum| 9|48,2766683843868|14,2814265266950|\n", "| 014| Götterbaum| 9|48,2764131088558|14,2809709854517|\n", "| 004| Rotbuche| 9|48,2768218589229|14,2821505374108|\n", "| 002| Winter-Linde| 9|48,2960178194628|14,2818038617467|\n", "| 001| Silber-Ahorn| 9|48,2777290002478|14,2797511856080|\n", "| 036| Ahorn| 9|48,2335935143941|14,3798873585795|\n", "| 047| Sommer-Linde| 9|48,2346725447652|14,3758058184719|\n", "| 056| Hainbuche| 9|48,2784551013593|14,3103718222569|\n", "| 022| Weiß-Birke| 9|48,2781732375800|14,2972684731463|\n", "+------+--------------------+-----+----------------+----------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "df.createOrReplaceTempView(\"baeume\")\n", "spark.sql(\"SELECT BaumNr, NameDeutsch, Hoehe, lat, lon FROM baeume order by Hoehe desc\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note:** After you're done, it's important to close the Spark session in order to release cluster resources." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "spark.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 2 }