<a href="https://colab.research.google.com/github/groda/big_data/blob/master/PySpark_miscellanea_on_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<a href="https://github.com/groda/big_data"><div><img src="https://github.com/groda/big_data/blob/master/logo_bdb.png?raw=true" style="display:block;float:right;max-width:90px;" align=right></div></a>

# PySpark miscellanea
    
<h3>Table of Contents<span class="tocSkip"></span></h3>
<div class="toc"><ul class="toc-item"><li><span><a href="#How-to-get-your-application's-id-in-pyspark" data-toc-modified-id="How-to-get-your-application's-id-in-pyspark-1.1">How to get your application's id in pyspark</a></span></li><li><span><a href="#How-to-get/set-default-parallelism-in-pyspark" data-toc-modified-id="How-to-get/set-default-parallelism-in-pyspark-1.2">How to get/set default parallelism in pyspark</a></span></li><li><span><a href="#About--spark-defaults.conf" data-toc-modified-id="About--spark-defaults.conf-1.3">About  <code>spark-defaults.conf</code></a></span></li></ul></div>



### Imports

In [1]:
%%bash
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## How to get your application's id in pyspark


See also: [How to extract application ID from the PySpark context](https://stackoverflow.com/questions/30983226/how-to-extract-application-id-from-the-pyspark-context)



### Starting from a Spark session

What is a [Spark session](https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession)?

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("App ID") \
    .getOrCreate()

Get the session's context ([what is a Spark context?](https://spark.apache.org/docs/latest/rdd-programming-guide.html#initializing-spark) and [detailed documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html)).

In [3]:
sc = spark.sparkContext
sc

Get `applicationId` from the context.

In [4]:
sc.applicationId

'local-1668378738271'

All in one step:

In [5]:
spark.sparkContext.applicationId

'local-1668378738271'

**Note:** if you're using the _pyspark shell_ (see [using the shell](https://spark.apache.org/docs/latest/rdd-programming-guide.html#initializing-spark)), `SparkContext` is created automatically and it can be accessed from the variable called `sc`.

## How to get/set default parallelism in pyspark

Create a [Spark session](https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession).

In [6]:
spark = SparkSession \
        .builder \
        .appName("defaultParallelism") \
        .getOrCreate()

Check the value of `defaultParallelism`:

In [7]:
spark.sparkContext.defaultParallelism

2

To change a property it's necessary to stop and start a new context/session.

In [8]:
spark = SparkSession \
        .builder \
        .appName("Set parallelism") \
        .config("spark.default.parallelism", 8) \
        .getOrCreate()

Default parallelism hasn't changed!

In [9]:
spark.sparkContext.defaultParallelism

2

Stop and start session anew.

In [10]:
spark.stop()
spark = SparkSession \
        .builder \
        .appName("Set parallelism") \
        .config("spark.default.parallelism", 4) \
        .getOrCreate()

In [11]:
spark.sparkContext.defaultParallelism

4

Great! Now the context has been changed (and also the applications's name has been updated).

In [12]:
spark.sparkContext

### What is `spark.default.parallelism`?

This property determines the default number of chunks in which an RDD ([Resilient Distributed Dataset](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds)) is partitioned.

Unless specified by the user, the value of is set based on the _cluster manager_:
 - in standalone mode it is equal to the number of (virtual) cores on the local machine
 - in Mesos: 8
 - for YARN, Kubernetes: total number of cores on all executor nodes or 2, whichever is larger
 
 (see [Spark configuration/Execution behavior](https://spark.apache.org/docs/latest/configuration.html#execution-behavior))

## About  `spark-defaults.conf`

The file `spark-defaults.conf` contains the default Spark configuration properties and it is by default located in Spark's configuration directory `$SPARK_HOME/conf` (see [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html)). 

The format of `spark-defaults.conf` is whitespace-separated lines containing property name and value, for instance:
```
spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer
```

### Where is my `spark-defaults.conf`?

Use `findspark` to find the location of `SPARK_HOME`.

In [13]:
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [14]:
import findspark
import os
findspark.init()
os.environ["SPARK_HOME"]

'/usr/local/lib/python3.7/dist-packages/pyspark'

Let's look for all files called `spark-defaults*` in Spark's configuration directory:

In [15]:
import glob
glob.glob(os.path.join(os.environ["SPARK_HOME"], "conf", "spark-defaults*"))

['/usr/local/lib/python3.7/dist-packages/pyspark/conf/spark-defaults.conf']

If no file `spark-defaults.conf` is contained in Spark's configuration directory, you should find a _template_ configuration file `spark-defaults.conf.template`. You can rename this to `spark-defaults.conf` and use it as default configuration file.


If neither `spark-defaults.conf` nor a template file is found, create  `spark-defaults.conf`. 

In [16]:
if not os.path.exists(os.path.join(os.environ["SPARK_HOME"], "conf")):
    os.makedirs(os.path.join(os.environ["SPARK_HOME"], "conf"))

In [17]:
%%writefile /usr/local/lib/python3.7/dist-packages/pyspark/conf/spark-defaults.conf
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

Overwriting /usr/local/lib/python3.7/dist-packages/pyspark/conf/spark-defaults.conf


As you see, everything is commented out in the template file. Just uncomment and edit the properties you want to set as defaults.

### How to override the default configuration directory

If you want to have your Spark configuration files in a directory other than `$SPARK_HOME/conf`, you can set the environment variable `SPARK_CONF_DIR`. 

Spark will then look in `$SPARK_CONF_DIR` for all of its configuration files: `spark-defaults.conf`, `spark-env.sh`, `log4j2.properties`, etc. (see https://spark.apache.org/docs/latest/configuration.html#overriding-configuration-directory).

Here's the list of files in the default Spark configuration directory:

In [18]:
os.listdir(os.path.join(os.environ["SPARK_HOME"],"conf"))

['spark-defaults.conf']

But now assume that you have no `spark-defaults.conf` or did not configure Spark anywhere else. Still, Spark has some _default values_ for several properties.

Where are those properties defined and how to get their default values?

#### Spark configuration properties

Spark's documentation provides the list of all [available properties](https://spark.apache.org/docs/latest/configuration.html#available-properties) grouped into several categories:

 - [application properties](https://spark.apache.org/docs/latest/configuration.html#application-properties)
 - [runtime environment](https://spark.apache.org/docs/latest/configuration.html#runtime-environment)
 - [shuffle behavior](https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior)
 - [Spark UI](https://spark.apache.org/docs/latest/configuration.html#spark-ui)
 - [compression and serialization](https://spark.apache.org/docs/latest/configuration.html#compression-and-serialization)
 - [memory management](https://spark.apache.org/docs/latest/configuration.html#memory-management)
 - [execution behavior](https://spark.apache.org/docs/latest/configuration.html#execution-behavior)
 - [executor metrics](https://spark.apache.org/docs/latest/configuration.html#executor-metrics)
 - [networking](https://spark.apache.org/docs/latest/configuration.html#networking)
 - [scheduling](https://spark.apache.org/docs/latest/configuration.html#scheduling)
 - [barrier execution mode](https://spark.apache.org/docs/latest/configuration.html#barrier-execution-mode)
 - [dynamic allocation](https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation)
 - [thread configurations](https://spark.apache.org/docs/latest/configuration.html#thread-configurations)
 - [security](https://spark.apache.org/docs/latest/security.html)
 - [Spark SQL](https://spark.apache.org/docs/latest/configuration.html#spark-sql)
 - [Spark streaming](https://spark.apache.org/docs/latest/configuration.html#spark-streaming)
 - [SparkR](https://spark.apache.org/docs/latest/configuration.html#sparkr)
 - [GraphX](https://spark.apache.org/docs/latest/configuration.html#graphx)
 - [Deploy](https://spark.apache.org/docs/latest/configuration.html#deploy)

All properties have a default value that should accommodate most situations.

As a beginner you might want to give your application a name by configuring `spark.app.name` and perhaps change the default values of the following properties:
 - `spark.master` and `spark.submit.deployMode` to define where the application should be deployed
 - `spark.driver.memory` and `spark.driver.maxResultSize` to control the memory usage of the driver
 - `spark.executor.memory` and `spark.executor.cores` to control executors
 
 
For instance let's create a new session

In [19]:
spark.stop()
spark = SparkSession \
        .builder \
        .appName("my_app") \
        .config("spark.driver.memory", "2g") \
        .getOrCreate()

Show properties included in the Spark context

In [20]:
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', 'c3218be40803'),
 ('spark.app.name', 'my_app'),
 ('spark.rdd.compress', 'True'),
 ('spark.executor.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/