In [1]:
import os, sys, glob, datetime

# specify spark version, python version
spark_home = "/home/zero/spark-2.4.0-bin-hadoop2.7" # MODIFY THIS
python_path ="/apps/anaconda2/bin/python"
# set environment variables
os.environ['SPARK_HOME'] = spark_home
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['SPARK_LOCAL_IP'] = "127.0.0.1"

def setup_spark_env(app_name):
    # set environment variables
    spark_python = os.path.join(spark_home, 'python')
    py4j = glob.glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0]
    sys.path[:0] = [spark_python, py4j]
    # specify Spark application parameters
    PYSPARK_SUBMIT_ARGS="--master local[2]"

    os.environ['PYSPARK_SUBMIT_ARGS'] = (PYSPARK_SUBMIT_ARGS 
        + " --name '%s_%s'"%(app_name, datetime.datetime.now().strftime("%Y%m%d %H:%M")) 
        + " pyspark-shell")    
    return

#
setup_spark_env("your_name_test_spark") # MODIFY THIS
# launching PySpark application
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
sc.setLogLevel('ERROR')
print("{}".format(sc.applicationId))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 2.7.15 (default, Dec 14 2018 19:04:19)
SparkSession available as 'spark'.
local-1560753877291


In [2]:
from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
import numpy as np
import pandas as pd

# Create sample data

In [3]:
data = [
    ["Thin", "Cell phone", 6000],
    ["Normal", "Tablet", 1500],
    ["Mini", "Tablet", 5500],
    ["Ultra thin", "Cell phone", 5000],
    ["Very thin", "Cell phone", 6000],
    ["Big", "Tablet", 2500],
    ["Bendable", "Cell phone", 3000],
    ["Foldable", "Cell phone", 3000],
    ["Pro", "Tablet", 4500],    
    ["Pro2", "Tablet", 6500],        
]

productReveneue = pd.DataFrame(data, columns=["product", "category", "revenue"])
productReveneue.head(10)

Unnamed: 0,product,category,revenue
0,Thin,Cell phone,6000
1,Normal,Tablet,1500
2,Mini,Tablet,5500
3,Ultra thin,Cell phone,5000
4,Very thin,Cell phone,6000
5,Big,Tablet,2500
6,Bendable,Cell phone,3000
7,Foldable,Cell phone,3000
8,Pro,Tablet,4500
9,Pro2,Tablet,6500


In [4]:
# convert to Spark Dataframe
df_productReveneue = sqlContext.createDataFrame(productReveneue)
df_productReveneue.show()

+----------+----------+-------+
|   product|  category|revenue|
+----------+----------+-------+
|      Thin|Cell phone|   6000|
|    Normal|    Tablet|   1500|
|      Mini|    Tablet|   5500|
|Ultra thin|Cell phone|   5000|
| Very thin|Cell phone|   6000|
|       Big|    Tablet|   2500|
|  Bendable|Cell phone|   3000|
|  Foldable|Cell phone|   3000|
|       Pro|    Tablet|   4500|
|      Pro2|    Tablet|   6500|
+----------+----------+-------+



# Ranking functions

In [5]:
# Signature: sf.dense_rank()
# Docstring:
# Window function: returns the rank of rows within a window partition, without any gaps.

# The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking
# sequence when there are ties. That is, if you were ranking a competition using dense_rank
# and had three people tie for second place, you would say that all three were in second
# place and that the next person came in third. Rank would give me sequential numbers, making
# the person that came in third place (after the ties) would register as coming in fifth.

# This is equivalent to the DENSE_RANK function in SQL.
query_window = Window.partitionBy("category").orderBy("revenue")
df_query = df_productReveneue.withColumn("dense_rank", sf.dense_rank().over(query_window))
df_query.show()

+----------+----------+-------+----------+
|   product|  category|revenue|dense_rank|
+----------+----------+-------+----------+
|  Bendable|Cell phone|   3000|         1|
|  Foldable|Cell phone|   3000|         1|
|Ultra thin|Cell phone|   5000|         2|
|      Thin|Cell phone|   6000|         3|
| Very thin|Cell phone|   6000|         3|
|    Normal|    Tablet|   1500|         1|
|       Big|    Tablet|   2500|         2|
|       Pro|    Tablet|   4500|         3|
|      Mini|    Tablet|   5500|         4|
|      Pro2|    Tablet|   6500|         5|
+----------+----------+-------+----------+



In [6]:
# Signature: sf.rank()
# Docstring:
# Window function: returns the rank of rows within a window partition.
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("rank", sf.rank().over(query_window))
df_query.show()

+----------+----------+-------+----+
|   product|  category|revenue|rank|
+----------+----------+-------+----+
|      Thin|Cell phone|   6000|   1|
| Very thin|Cell phone|   6000|   1|
|Ultra thin|Cell phone|   5000|   3|
|  Bendable|Cell phone|   3000|   4|
|  Foldable|Cell phone|   3000|   4|
|      Pro2|    Tablet|   6500|   1|
|      Mini|    Tablet|   5500|   2|
|       Pro|    Tablet|   4500|   3|
|       Big|    Tablet|   2500|   4|
|    Normal|    Tablet|   1500|   5|
+----------+----------+-------+----+



In [7]:
# Signature: sf.percent_rank()
# Docstring:
# Window function: returns the relative rank (i.e. percentile) of rows within a window partition.
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("percent_rank", sf.percent_rank().over(query_window))
df_query.show()

+----------+----------+-------+------------+
|   product|  category|revenue|percent_rank|
+----------+----------+-------+------------+
|      Thin|Cell phone|   6000|         0.0|
| Very thin|Cell phone|   6000|         0.0|
|Ultra thin|Cell phone|   5000|         0.5|
|  Bendable|Cell phone|   3000|        0.75|
|  Foldable|Cell phone|   3000|        0.75|
|      Pro2|    Tablet|   6500|         0.0|
|      Mini|    Tablet|   5500|        0.25|
|       Pro|    Tablet|   4500|         0.5|
|       Big|    Tablet|   2500|        0.75|
|    Normal|    Tablet|   1500|         1.0|
+----------+----------+-------+------------+



In [8]:
# Signature: sf.ntile(n)
# Docstring:
# Window function: returns the ntile group id (from 1 to `n` inclusive)
# in an ordered window partition. For example, if `n` is 4, the first
# quarter of the rows will get value 1, the second quarter will get 2,
# the third quarter will get 3, and the last quarter will get 4.

query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("ntile", sf.ntile(3).over(query_window))
df_query.show()

+----------+----------+-------+-----+
|   product|  category|revenue|ntile|
+----------+----------+-------+-----+
|      Thin|Cell phone|   6000|    1|
| Very thin|Cell phone|   6000|    1|
|Ultra thin|Cell phone|   5000|    2|
|  Bendable|Cell phone|   3000|    2|
|  Foldable|Cell phone|   3000|    3|
|      Pro2|    Tablet|   6500|    1|
|      Mini|    Tablet|   5500|    1|
|       Pro|    Tablet|   4500|    2|
|       Big|    Tablet|   2500|    2|
|    Normal|    Tablet|   1500|    3|
+----------+----------+-------+-----+



In [9]:
# Signature: sf.row_number()
# Docstring:
# Window function: returns a sequential number starting at 1 within a window partition.

query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("row_number", sf.row_number().over(query_window))
df_query.show()

+----------+----------+-------+----------+
|   product|  category|revenue|row_number|
+----------+----------+-------+----------+
|      Thin|Cell phone|   6000|         1|
| Very thin|Cell phone|   6000|         2|
|Ultra thin|Cell phone|   5000|         3|
|  Bendable|Cell phone|   3000|         4|
|  Foldable|Cell phone|   3000|         5|
|      Pro2|    Tablet|   6500|         1|
|      Mini|    Tablet|   5500|         2|
|       Pro|    Tablet|   4500|         3|
|       Big|    Tablet|   2500|         4|
|    Normal|    Tablet|   1500|         5|
+----------+----------+-------+----------+



# Analytic functions

In [10]:
# Signature: sf.cume_dist()
# Docstring:
# Window function: returns the cumulative distribution of values within a window partition,
# i.e. the fraction of rows that are below the current row.

query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("cume_dist", sf.cume_dist().over(query_window))
df_query.show()

+----------+----------+-------+---------+
|   product|  category|revenue|cume_dist|
+----------+----------+-------+---------+
|      Thin|Cell phone|   6000|      0.4|
| Very thin|Cell phone|   6000|      0.4|
|Ultra thin|Cell phone|   5000|      0.6|
|  Bendable|Cell phone|   3000|      1.0|
|  Foldable|Cell phone|   3000|      1.0|
|      Pro2|    Tablet|   6500|      0.2|
|      Mini|    Tablet|   5500|      0.4|
|       Pro|    Tablet|   4500|      0.6|
|       Big|    Tablet|   2500|      0.8|
|    Normal|    Tablet|   1500|      1.0|
+----------+----------+-------+---------+



In [11]:
# Signature: sf.first(col, ignorenulls=False)
# Docstring:
# Aggregate function: returns the first value in a group.

# The function by default returns the first values it sees. It will return the first non-null
# value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("first", sf.first("product").over(query_window))
df_query.show()

+----------+----------+-------+-----+
|   product|  category|revenue|first|
+----------+----------+-------+-----+
|      Thin|Cell phone|   6000| Thin|
| Very thin|Cell phone|   6000| Thin|
|Ultra thin|Cell phone|   5000| Thin|
|  Bendable|Cell phone|   3000| Thin|
|  Foldable|Cell phone|   3000| Thin|
|      Pro2|    Tablet|   6500| Pro2|
|      Mini|    Tablet|   5500| Pro2|
|       Pro|    Tablet|   4500| Pro2|
|       Big|    Tablet|   2500| Pro2|
|    Normal|    Tablet|   1500| Pro2|
+----------+----------+-------+-----+



In [12]:
# Signature: sf.last(col, ignorenulls=False)
# Docstring:
# Aggregate function: returns the last value in a group.

# The function by default returns the last values it sees. It will return the last non-null
# value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

# NOTE: not reliable
query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("last", sf.last("product").over(query_window))
df_query.show()

+----------+----------+-------+----------+
|   product|  category|revenue|      last|
+----------+----------+-------+----------+
|      Thin|Cell phone|   6000| Very thin|
| Very thin|Cell phone|   6000| Very thin|
|Ultra thin|Cell phone|   5000|Ultra thin|
|  Bendable|Cell phone|   3000|  Foldable|
|  Foldable|Cell phone|   3000|  Foldable|
|      Pro2|    Tablet|   6500|      Pro2|
|      Mini|    Tablet|   5500|      Mini|
|       Pro|    Tablet|   4500|       Pro|
|       Big|    Tablet|   2500|       Big|
|    Normal|    Tablet|   1500|    Normal|
+----------+----------+-------+----------+



In [13]:
# Signature: sf.lag(col, count=1, default=None)
# Docstring:
# Window function: returns the value that is `offset` rows before the current row, and
# `defaultValue` if there is less than `offset` rows before the current row. For example,
# an `offset` of one will return the previous row at any given point in the window partition.

# This is equivalent to the LAG function in SQL.

# :param col: name of column or expression
# :param count: number of row to extend
# :param default: default value

query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("lag", sf.lag("revenue").over(query_window))
df_query.show()

+----------+----------+-------+----+
|   product|  category|revenue| lag|
+----------+----------+-------+----+
|      Thin|Cell phone|   6000|null|
| Very thin|Cell phone|   6000|6000|
|Ultra thin|Cell phone|   5000|6000|
|  Bendable|Cell phone|   3000|5000|
|  Foldable|Cell phone|   3000|3000|
|      Pro2|    Tablet|   6500|null|
|      Mini|    Tablet|   5500|6500|
|       Pro|    Tablet|   4500|5500|
|       Big|    Tablet|   2500|4500|
|    Normal|    Tablet|   1500|2500|
+----------+----------+-------+----+



In [14]:
# Signature: sf.lead(col, count=1, default=None)
# Docstring:
# Window function: returns the value that is `offset` rows after the current row, and
# `defaultValue` if there is less than `offset` rows after the current row. For example,
# an `offset` of one will return the next row at any given point in the window partition.

# This is equivalent to the LEAD function in SQL.

# :param col: name of column or expression
# :param count: number of row to extend
# :param default: default value

query_window = Window.partitionBy("category").orderBy(sf.col("revenue").desc())
df_query = df_productReveneue.withColumn("lead", sf.lead("revenue").over(query_window))
df_query.show()

+----------+----------+-------+----+
|   product|  category|revenue|lead|
+----------+----------+-------+----+
|      Thin|Cell phone|   6000|6000|
| Very thin|Cell phone|   6000|5000|
|Ultra thin|Cell phone|   5000|3000|
|  Bendable|Cell phone|   3000|3000|
|  Foldable|Cell phone|   3000|null|
|      Pro2|    Tablet|   6500|5500|
|      Mini|    Tablet|   5500|4500|
|       Pro|    Tablet|   4500|2500|
|       Big|    Tablet|   2500|1500|
|    Normal|    Tablet|   1500|null|
+----------+----------+-------+----+



In [15]:
# populate same result across rows
query_window = Window.partitionBy("category")
df_query = df_productReveneue.withColumn("max_revenue", sf.max("revenue").over(query_window))
df_query = df_query.withColumn("avg_revenue", sf.avg("revenue").over(query_window))
df_query = df_query.withColumn("total_revenue", sf.sum("revenue").over(query_window))
df_query.show()

+----------+----------+-------+-----------+-----------+-------------+
|   product|  category|revenue|max_revenue|avg_revenue|total_revenue|
+----------+----------+-------+-----------+-----------+-------------+
|      Thin|Cell phone|   6000|       6000|     4600.0|        23000|
|Ultra thin|Cell phone|   5000|       6000|     4600.0|        23000|
| Very thin|Cell phone|   6000|       6000|     4600.0|        23000|
|  Bendable|Cell phone|   3000|       6000|     4600.0|        23000|
|  Foldable|Cell phone|   3000|       6000|     4600.0|        23000|
|    Normal|    Tablet|   1500|       6500|     4100.0|        20500|
|      Mini|    Tablet|   5500|       6500|     4100.0|        20500|
|       Big|    Tablet|   2500|       6500|     4100.0|        20500|
|       Pro|    Tablet|   4500|       6500|     4100.0|        20500|
|      Pro2|    Tablet|   6500|       6500|     4100.0|        20500|
+----------+----------+-------+-----------+-----------+-------------+



In [16]:
# will accumulate if use orderBy
query_window = Window.partitionBy("category").orderBy("revenue")
df_query = df_productReveneue.withColumn("max_revenue", sf.max("revenue").over(query_window))
df_query = df_query.withColumn("avg_revenue", sf.avg("revenue").over(query_window))
df_query = df_query.withColumn("total_revenue", sf.sum("revenue").over(query_window))
df_query.show()

+----------+----------+-------+-----------+------------------+-------------+
|   product|  category|revenue|max_revenue|       avg_revenue|total_revenue|
+----------+----------+-------+-----------+------------------+-------------+
|  Bendable|Cell phone|   3000|       3000|            3000.0|         6000|
|  Foldable|Cell phone|   3000|       3000|            3000.0|         6000|
|Ultra thin|Cell phone|   5000|       5000|3666.6666666666665|        11000|
|      Thin|Cell phone|   6000|       6000|            4600.0|        23000|
| Very thin|Cell phone|   6000|       6000|            4600.0|        23000|
|    Normal|    Tablet|   1500|       1500|            1500.0|         1500|
|       Big|    Tablet|   2500|       2500|            2000.0|         4000|
|       Pro|    Tablet|   4500|       4500|2833.3333333333335|         8500|
|      Mini|    Tablet|   5500|       5500|            3500.0|        14000|
|      Pro2|    Tablet|   6500|       6500|            4100.0|        20500|

# ROW vs RANGE frame

In [17]:
query_window = Window.partitionBy("category").orderBy("revenue").rowsBetween(-1, 1)
df_query = df_productReveneue.withColumn("max_revenue", sf.max("revenue").over(query_window))
df_query.show()

+----------+----------+-------+-----------+
|   product|  category|revenue|max_revenue|
+----------+----------+-------+-----------+
|  Bendable|Cell phone|   3000|       3000|
|  Foldable|Cell phone|   3000|       5000|
|Ultra thin|Cell phone|   5000|       6000|
|      Thin|Cell phone|   6000|       6000|
| Very thin|Cell phone|   6000|       6000|
|    Normal|    Tablet|   1500|       2500|
|       Big|    Tablet|   2500|       4500|
|       Pro|    Tablet|   4500|       5500|
|      Mini|    Tablet|   5500|       6500|
|      Pro2|    Tablet|   6500|       6500|
+----------+----------+-------+-----------+



In [18]:
query_window = Window.partitionBy("category").orderBy("revenue").rangeBetween(0, 1000)
df_query = df_productReveneue.withColumn("max_revenue", sf.max("revenue").over(query_window))
df_query.show()

+----------+----------+-------+-----------+
|   product|  category|revenue|max_revenue|
+----------+----------+-------+-----------+
|  Bendable|Cell phone|   3000|       3000|
|  Foldable|Cell phone|   3000|       3000|
|Ultra thin|Cell phone|   5000|       6000|
|      Thin|Cell phone|   6000|       6000|
| Very thin|Cell phone|   6000|       6000|
|    Normal|    Tablet|   1500|       2500|
|       Big|    Tablet|   2500|       2500|
|       Pro|    Tablet|   4500|       5500|
|      Mini|    Tablet|   5500|       6500|
|      Pro2|    Tablet|   6500|       6500|
+----------+----------+-------+-----------+



In [19]:
query_window = Window.partitionBy("category").orderBy("revenue").rowsBetween(Window.unboundedPreceding, 0)
df_query = df_productReveneue.withColumn("accum_revenue", sf.sum("revenue").over(query_window))
df_query.show()

+----------+----------+-------+-------------+
|   product|  category|revenue|accum_revenue|
+----------+----------+-------+-------------+
|  Bendable|Cell phone|   3000|         3000|
|  Foldable|Cell phone|   3000|         6000|
|Ultra thin|Cell phone|   5000|        11000|
|      Thin|Cell phone|   6000|        17000|
| Very thin|Cell phone|   6000|        23000|
|    Normal|    Tablet|   1500|         1500|
|       Big|    Tablet|   2500|         4000|
|       Pro|    Tablet|   4500|         8500|
|      Mini|    Tablet|   5500|        14000|
|      Pro2|    Tablet|   6500|        20500|
+----------+----------+-------+-------------+

