{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Getting started with Spark: Spark SQL in Python" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This tutorial is based on [Spark SQL Guide - Getting started](https://spark.apache.org/docs/latest/sql-getting-started.html).\n", "\n", "For this demo we used the city of Vienna trees dataset (\"Baumkataster\") made available by [Open Data Österreich](https://www.data.gv.at) and downloadable from [here](https://www.data.gv.at/katalog/dataset/c91a4635-8b7d-43fe-9b27-d95dec8392a7) .\n", "\n", "# Table of contents\n", "1. [Spark session](#sparkSession)\n", "2. [Spark configuration](#sparkConfiguration)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Spark session \n", "\n", "We're going to start by creating a Spark _session_. Our Spark job will be named \"Python Spark SQL basic example\". `spark` is the variable holding our Spark session." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Python Spark SQL basic example\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read the file into a Spark [_dataframe_](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "df = spark.read \\\n", " .load(\"FME_BaumdatenBearbeitet_OGD_20190205.csv\",\n", " format=\"csv\", sep=\";\", header=\"true\", encoding=\"iso-8859-1\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note:** we assume that the file `FME_BaumdatenBearbeitet_OGD_20190205.csv` is in your local directory. If at this point you get an error message that looks like `AnalysisException: 'Path does not exist` then check your [Spark configuration](#sparkConfig) for how to define the correct file path." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Show first three lines of Spark dataframe" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+\n", "|Flaeche|BaumNr| Gattung| Art|Sorte|NameDeutsch|Hoehe|Schirmdurchmesser|Stammumfang|Typ| XPos| YPos| lon| lat|\n", "+-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+\n", "| 0| 0| ZumTesten| 0| 0| 20190205| 0| 0| 0| 0|70000|350000|14,2757549011314|48,2844031941042|\n", "| 870| 021a| Quercus| sp.| -| Eiche| 6| 3| 31| L|72431|354949|14,3093549528352|48,3286271802142|\n", "| 572| 127|Liriodendron|tulipifera| -| Tulpenbaum| 5| 2| 21| L|71171|353742|14,2921648325343|48,3179178510249|\n", "+-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+\n", "only showing top 3 rows\n", "\n" ] } ], "source": [ "df.show(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For pretty-printing you can use `toPandas()`" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", " | Flaeche | \n", "BaumNr | \n", "Gattung | \n", "Art | \n", "Sorte | \n", "NameDeutsch | \n", "Hoehe | \n", "Schirmdurchmesser | \n", "Stammumfang | \n", "Typ | \n", "XPos | \n", "YPos | \n", "lon | \n", "lat | \n", "
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | \n", "0 | \n", "0 | \n", "ZumTesten | \n", "0 | \n", "0 | \n", "20190205 | \n", "0 | \n", "0 | \n", "0 | \n", "0 | \n", "70000 | \n", "350000 | \n", "14,2757549011314 | \n", "48,2844031941042 | \n", "
1 | \n", "870 | \n", "021a | \n", "Quercus | \n", "sp. | \n", "- | \n", "Eiche | \n", "6 | \n", "3 | \n", "31 | \n", "L | \n", "72431 | \n", "354949 | \n", "14,3093549528352 | \n", "48,3286271802142 | \n", "
2 | \n", "572 | \n", "127 | \n", "Liriodendron | \n", "tulipifera | \n", "- | \n", "Tulpenbaum | \n", "5 | \n", "2 | \n", "21 | \n", "L | \n", "71171 | \n", "353742 | \n", "14,2921648325343 | \n", "48,3179178510249 | \n", "
\n", " spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Python Spark SQL basic example\") \\\n", " .getOrCreate()\n", "\n", "\n", "Let us look at the rest of the Spark configuration." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('spark.app.id', 'local-1574021194317'),\n", " ('spark.driver.port', '34544'),\n", " ('spark.rdd.compress', 'True'),\n", " ('spark.serializer.objectStreamReset', '100'),\n", " ('spark.master', 'local[*]'),\n", " ('spark.executor.id', 'driver'),\n", " ('spark.app.name', 'Python Spark SQL basic example'),\n", " ('spark.submit.deployMode', 'client'),\n", " ('spark.driver.host', 'c100.local'),\n", " ('spark.ui.showConsoleProgress', 'true')]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.conf import SparkConf\n", "spark.sparkContext._conf.getAll()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The property `spark.app.name` is the name of our app that we just defined.\n", "\n", "Another important property is `spark.master`. This defines the _master URL_ for the Spark application. A list of all admissible values for `spark.master` is given here: [master-urls](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls).\n", "\n", "In this example the Spark master URL is `local[*]`, this means that our Spark application will run locally with as many worker threads as logical cores on our local machine.\n", "\n", "If you have a Hadoop cluster available you can deploy your Spark application on Yarn by setting the option `spark.master = yarn`. Let's do that and then check the Spark configuration once again." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('spark.master', 'yarn'),\n", " ('spark.app.id', 'local-1574021194317'),\n", " ('spark.driver.port', '34544'),\n", " ('spark.rdd.compress', 'True'),\n", " ('spark.serializer.objectStreamReset', '100'),\n", " ('spark.executor.id', 'driver'),\n", " ('spark.app.name', 'Python Spark SQL basic example'),\n", " ('spark.submit.deployMode', 'client'),\n", " ('spark.driver.host', 'c100.local'),\n", " ('spark.ui.showConsoleProgress', 'true')]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Python Spark SQL basic example\") \\\n", " .master('yarn') \\\n", " .getOrCreate()\n", "\n", "spark.sparkContext._conf.getAll()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With this configuration our Spark application will run on the Hadoop cluster and its resources will be managed by Yarn.\n", "\n", "**Note:** If the Hadoop cluster is configured with HDFS as its default filesystem, then you need to upload your CSV file to Hadoop in order to be able to read it:\n", "
\n",
" hdfs dfs -put FME_BaumdatenBearbeitet_OGD_20190205.csv FME_BaumdatenBearbeitet_OGD_20190205.csv\n",
"
\n",
"and then you can just use `.load( ...) ` again."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-rw-r--r-- 3 datalab supergroup 2623070 2019-11-17 20:53 FME_BaumdatenBearbeitet_OGD_20190205.csv\n"
]
}
],
"source": [
"%%bash\n",
"hdfs dfs -put FME_BaumdatenBearbeitet_OGD_20190205.csv \n",
"hdfs dfs -ls FME_BaumdatenBearbeitet_OGD_20190205.csv"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"df = spark.read \\\n",
" .load(\"FME_BaumdatenBearbeitet_OGD_20190205.csv\",\n",
" format=\"csv\", sep=\";\", header=\"true\", encoding=\"iso-8859-1\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's now re-run the previous commands. This time the application is going to be deployed on the cluster."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------+--------------------+-----+----------------+----------------+\n",
"|BaumNr| NameDeutsch|Hoehe| lat| lon|\n",
"+------+--------------------+-----+----------------+----------------+\n",
"| 844| Weißdorn| 99|48,3198141692826|14,3032456456049|\n",
"| 005| Hainbuche| 99|48,3107341721266|14,2818381176027|\n",
"| 129| Weiß-Birke| 99|48,2764221420506|14,3029622419918|\n",
"| 037|Gemeine Kiefer / ...| 95|48,3021818905511|14,2769106844535|\n",
"| 007| Tulpenbaum| 90|48,2765758955147|14,3027882173276|\n",
"| 051| Weiß-Birke| 9|48,2671468323820|14,2832242240464|\n",
"| 001| Säulen-Hainbuche| 9|48,2973425597313|14,3138560746343|\n",
"| 009| Sommer-Linde| 9|48,2383321988402|14,3668869798697|\n",
"| 025| Schwarz-Birke| 9|48,2749605071833|14,2781176169409|\n",
"| 004| Rotbuche| 9|48,2768058741599|14,2821412254345|\n",
"| 011| Robinie| 9|48,2382302739335|14,3347263036350|\n",
"| 009| Götterbaum| 9|48,2766683843868|14,2814265266950|\n",
"| 014| Götterbaum| 9|48,2764131088558|14,2809709854517|\n",
"| 004| Rotbuche| 9|48,2768218589229|14,2821505374108|\n",
"| 002| Winter-Linde| 9|48,2960178194628|14,2818038617467|\n",
"| 001| Silber-Ahorn| 9|48,2777290002478|14,2797511856080|\n",
"| 036| Ahorn| 9|48,2335935143941|14,3798873585795|\n",
"| 047| Sommer-Linde| 9|48,2346725447652|14,3758058184719|\n",
"| 056| Hainbuche| 9|48,2784551013593|14,3103718222569|\n",
"| 022| Weiß-Birke| 9|48,2781732375800|14,2972684731463|\n",
"+------+--------------------+-----+----------------+----------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"df.createOrReplaceTempView(\"baeume\")\n",
"spark.sql(\"SELECT BaumNr, NameDeutsch, Hoehe, lat, lon FROM baeume order by Hoehe desc\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Note:** After you're done, it's important to close the Spark session in order to release cluster resources."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"spark.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}