{ "cells": [ { "cell_type": "markdown", "id": "d6244506", "metadata": {}, "source": [ "# PySpark miscellanea\n", "\n", "

\n", "

\n", " \n", "

Table of Contents

\n", "
\n", "\n" ] }, { "cell_type": "markdown", "id": "9dfe7c0a", "metadata": {}, "source": [ "## How to get your application's id in pyspark\n", "\n", "\n", "See also: [How to extract application ID from the PySpark context](https://stackoverflow.com/questions/30983226/how-to-extract-application-id-from-the-pyspark-context)\n", "\n" ] }, { "cell_type": "markdown", "id": "02dbf961", "metadata": {}, "source": [ "### Starting from a Spark session\n", "\n", "What is a [Spark session](https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession)?" ] }, { "cell_type": "code", "execution_count": 1, "id": "0fd4ce7b", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"App ID\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "id": "d02813d3", "metadata": {}, "source": [ "Get the session's context ([what is a Spark context?](https://spark.apache.org/docs/latest/rdd-programming-guide.html#initializing-spark) and [detailed documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html))." ] }, { "cell_type": "code", "execution_count": 2, "id": "7d8b99e3", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "


\n", "\n", "

Spark UI

\n", "\n", "
\n", "
\n", "
\n", "
\n", "
\n", "
\n", "
App ID
\n", "
\n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sc = spark.sparkContext\n", "sc" ] }, { "cell_type": "markdown", "id": "57df230b", "metadata": {}, "source": [ "Get `applicationId` from the context." ] }, { "cell_type": "code", "execution_count": 3, "id": "bed1685a", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'local-1667946735598'" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sc.applicationId" ] }, { "cell_type": "markdown", "id": "de33f1ce", "metadata": {}, "source": [ "All in one step:" ] }, { "cell_type": "code", "execution_count": 4, "id": "f0023074", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'local-1667946735598'" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext.applicationId" ] }, { "cell_type": "markdown", "id": "dc31a770", "metadata": {}, "source": [ "**Note:** if you're using the _pyspark shell_ (see [using the shell](https://spark.apache.org/docs/latest/rdd-programming-guide.html#initializing-spark)), `SparkContext` is created automatically and it can be accessed from the variable called `sc`." ] }, { "cell_type": "markdown", "id": "44ece7a4", "metadata": {}, "source": [ "## How to get/set default parallelism in pyspark" ] }, { "cell_type": "markdown", "id": "880e6ada", "metadata": {}, "source": [ "Create a [Spark session](https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession)." ] }, { "cell_type": "code", "execution_count": 5, "id": "451653da", "metadata": {}, "outputs": [], "source": [ "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"defaultParallelism\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "id": "d2fe39a2", "metadata": {}, "source": [ "Check the value of `defaultParallelism`:" ] }, { "cell_type": "code", "execution_count": 6, "id": "f221e3d1", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "8" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext.defaultParallelism" ] }, { "cell_type": "markdown", "id": "69b3994c", "metadata": {}, "source": [ "To change a property it's necessary to stop and start a new context/session." ] }, { "cell_type": "code", "execution_count": 7, "id": "4cf47f42", "metadata": {}, "outputs": [], "source": [ "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Set parallelism\") \\\n", " .config(\"spark.default.parallelism\", 4) \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "id": "c6725687", "metadata": {}, "source": [ "Default parallelism hasn't changed!" ] }, { "cell_type": "code", "execution_count": 8, "id": "6b592d7e", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "8" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext.defaultParallelism" ] }, { "cell_type": "markdown", "id": "49e168d0", "metadata": {}, "source": [ "Stop and start session anew." ] }, { "cell_type": "code", "execution_count": 9, "id": "da0512cd", "metadata": {}, "outputs": [], "source": [ "spark.stop()\n", "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Set parallelism\") \\\n", " .config(\"spark.default.parallelism\", 4) \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": 10, "id": "bba24e5e", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "4" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext.defaultParallelism" ] }, { "cell_type": "markdown", "id": "1ef9f91f", "metadata": {}, "source": [ "Great! Now the context has been changed (and also the applications's name has been updated)." ] }, { "cell_type": "code", "execution_count": 11, "id": "0e2fbc63", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "


\n", "\n", "

Spark UI

\n", "\n", "
\n", "
\n", "
\n", "
\n", "
\n", "
\n", "
Set parallelism
\n", "
\n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext" ] }, { "cell_type": "markdown", "id": "238722de", "metadata": {}, "source": [ "### What is `spark.default.parallelism`?\n", "\n", "This property determines the default number of chunks in which an RDD ([Resilient Distributed Dataset](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds)) is partitioned.\n", "\n", "Unless specified by the user, the value of is set based on the _cluster manager_:\n", " - in standalone mode it is equal to the number of (virtual) cores on the local machine\n", " - in Mesos: 8\n", " - for YARN, Kubernetes: total number of cores on all executor nodes or 2, whichever is larger\n", " \n", " (see [Spark configuration/Execution behavior](https://spark.apache.org/docs/latest/configuration.html#execution-behavior))" ] }, { "cell_type": "markdown", "id": "8cb775c1", "metadata": {}, "source": [ "## About `spark-defaults.conf`" ] }, { "cell_type": "markdown", "id": "4cb5650c", "metadata": {}, "source": [ "The file `spark-defaults.conf` contains the default Spark configuration properties and it is by default located in Spark's configuration directory `$SPARK_HOME/conf` (see [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html)). \n", "\n", "The format of `spark-defaults.conf` is whitespace-separated lines containing property name and value, for instance:\n", "```\n", "spark.master spark://\n", "spark.executor.memory 4g\n", "spark.eventLog.enabled true\n", "spark.serializer org.apache.spark.serializer.KryoSerializer\n", "```" ] }, { "cell_type": "markdown", "id": "81a5dae5", "metadata": {}, "source": [ "### Where is my `spark-defaults.conf`?" ] }, { "cell_type": "markdown", "id": "7ac1d292", "metadata": {}, "source": [ "If no file `spark-defaults.conf` is contained in Spark's configuration directory, you should find a _template_ configuration file `spark-defaults.conf.template`. You can rename this to `spark-defaults.conf` and use it as default configuration file.\n", "\n", "Let's look for all files called `spark-defaults*` in Spark's configuration directory:" ] }, { "cell_type": "code", "execution_count": 12, "id": "6fede028", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['/usr/local/spark-3.1.2-bin-hadoop3.2/conf/spark-defaults.conf.template']" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import glob, os\n", "glob.glob(os.path.join(os.environ[\"SPARK_HOME\"], \"conf\", \"spark-defaults*\"))" ] }, { "cell_type": "markdown", "id": "077ff7c0", "metadata": {}, "source": [ "We found a template `spark-defaults.conf` file. \n", "\n", "Save the output from last cell (`_`) in the variable `conf_file`." ] }, { "cell_type": "code", "execution_count": 13, "id": "fa368738", "metadata": {}, "outputs": [], "source": [ "conf_file = _" ] }, { "cell_type": "markdown", "id": "04ddcf43", "metadata": {}, "source": [ "Look at the contents of `spark-defaults.conf.template`" ] }, { "cell_type": "code", "execution_count": 14, "id": "4ab0f938", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#\n", "# Licensed to the Apache Software Foundation (ASF) under one or more\n", "# contributor license agreements. See the NOTICE file distributed with\n", "# this work for additional information regarding copyright ownership.\n", "# The ASF licenses this file to You under the Apache License, Version 2.0\n", "# (the \"License\"); you may not use this file except in compliance with\n", "# the License. You may obtain a copy of the License at\n", "#\n", "# http://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "#\n", "\n", "# Default system properties included when running spark-submit.\n", "# This is useful for setting default environmental settings.\n", "\n", "# Example:\n", "# spark.master spark://master:7077\n", "# spark.eventLog.enabled true\n", "# spark.eventLog.dir hdfs://namenode:8021/directory\n", "# spark.serializer org.apache.spark.serializer.KryoSerializer\n", "# spark.driver.memory 5g\n", "# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers=\"one two three\"\n", "\n" ] } ], "source": [ "with open(conf_file[0], 'r') as f:\n", " print(f.read()) " ] }, { "cell_type": "markdown", "id": "21692085", "metadata": {}, "source": [ "As you see, everything is commented out in the template file. To set some of the properties, rename `spark-defaults.conf.template` and uncomment and edit the properties you want to set as defaults." ] }, { "cell_type": "markdown", "id": "2bc0e3e7", "metadata": {}, "source": [ "### How to override the default configuration directory\n", "\n", "If you want to have your Spark configuration files in a directory other than `$SPARK_HOME/conf`, you can set the environment variable `SPARK_CONF_DIR`. \n", "\n", "Spark will then look in `$SPARK_CONF_DIR` for all of its configuration files: `spark-defaults.conf`, `spark-env.sh`, `log4j2.properties`, etc. (see https://spark.apache.org/docs/latest/configuration.html#overriding-configuration-directory).\n", "\n", "Here's the list of files in the default Spark configuration directory:" ] }, { "cell_type": "code", "execution_count": 15, "id": "ff9642c7", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['spark-env.sh.template',\n", " 'fairscheduler.xml.template',\n", " 'metrics.properties.template',\n", " 'workers.template',\n", " 'log4j.properties.template',\n", " 'spark-defaults.conf.template',\n", " 'log4j.properties']" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "os.listdir(os.path.join(os.environ[\"SPARK_HOME\"],\"conf\"))" ] }, { "cell_type": "markdown", "id": "91a61ed7", "metadata": {}, "source": [ "But now assume that you have no `spark-defaults.conf` or did not configure Spark anywhere else. Still, Spark has some _default values_ for several properties.\n", "\n", "Where are those properties defined and how to get their default values?" ] }, { "cell_type": "markdown", "id": "5fd5be6f", "metadata": {}, "source": [ "#### Spark configuration properties\n", "\n", "Spark's documentation provides the list of all [available properties](https://spark.apache.org/docs/latest/configuration.html#available-properties) grouped into several categories:\n", "\n", " - [application properties](https://spark.apache.org/docs/latest/configuration.html#application-properties)\n", " - [runtime environment](https://spark.apache.org/docs/latest/configuration.html#runtime-environment)\n", " - [shuffle behavior](https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior)\n", " - [Spark UI](https://spark.apache.org/docs/latest/configuration.html#spark-ui)\n", " - [compression and serialization](https://spark.apache.org/docs/latest/configuration.html#compression-and-serialization)\n", " - [memory management](https://spark.apache.org/docs/latest/configuration.html#memory-management)\n", " - [execution behavior](https://spark.apache.org/docs/latest/configuration.html#execution-behavior)\n", " - [executor metrics](https://spark.apache.org/docs/latest/configuration.html#executor-metrics)\n", " - [networking](https://spark.apache.org/docs/latest/configuration.html#networking)\n", " - [scheduling](https://spark.apache.org/docs/latest/configuration.html#scheduling)\n", " - [barrier execution mode](https://spark.apache.org/docs/latest/configuration.html#barrier-execution-mode)\n", " - [dynamic allocation](https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation)\n", " - [thread configurations](https://spark.apache.org/docs/latest/configuration.html#thread-configurations)\n", " - [security](https://spark.apache.org/docs/latest/security.html)\n", " - [Spark SQL](https://spark.apache.org/docs/latest/configuration.html#spark-sql)\n", " - [Spark streaming](https://spark.apache.org/docs/latest/configuration.html#spark-streaming)\n", " - [SparkR](https://spark.apache.org/docs/latest/configuration.html#sparkr)\n", " - [GraphX](https://spark.apache.org/docs/latest/configuration.html#graphx)\n", " - [Deploy](https://spark.apache.org/docs/latest/configuration.html#deploy)" ] }, { "cell_type": "markdown", "id": "98e1480f", "metadata": {}, "source": [ "All properties have a default value that should accommodate most situations.\n", "\n", "As a beginner you might want to give your application a name by configuring `spark.app.name` and perhaps change the default values of the following properties:\n", " - `spark.master` and `spark.submit.deployMode` to define where the application should be deployed\n", " - `spark.driver.memory` and `spark.driver.maxResultSize` to control the memory usage of the driver\n", " - `spark.executor.memory` and `spark.executor.cores` to control executors\n", " \n", " \n", "For instance let's create a new session" ] }, { "cell_type": "code", "execution_count": 16, "id": "844047f9", "metadata": {}, "outputs": [], "source": [ "spark.stop()\n", "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"my_app\") \\\n", " .config(\"spark.driver.memory\", \"2g\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "id": "f3c250e0", "metadata": {}, "source": [ "Show properties included in the Spark context" ] }, { "cell_type": "code", "execution_count": 17, "id": "9f25abd1", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('spark.driver.port', '50313'),\n", " ('spark.app.id', 'local-1667946737593'),\n", " ('spark.sql.warehouse.dir',\n", " 'file:/Users/x/Documents/notebooks/spark-warehouse'),\n", " ('spark.executor.id', 'driver'),\n", " ('spark.app.startTime', '1667946737538'),\n", " ('spark.app.name', 'my_app'),\n", " ('spark.rdd.compress', 'True'),\n", " ('spark.driver.memory', '2g'),\n", " ('spark.serializer.objectStreamReset', '100'),\n", " ('spark.master', 'local[*]'),\n", " ('spark.submit.pyFiles', ''),\n", " ('spark.submit.deployMode', 'client'),\n", " ('spark.default.parallelism', '4'),\n", " ('spark.ui.showConsoleProgress', 'true'),\n", " ('spark.driver.host', '')]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext.getConf().getAll()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": false, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": true } }, "nbformat": 4, "nbformat_minor": 5 }