{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Welcome to\n", " ____ __\n", " / __/__ ___ _____/ /__\n", " _\\ \\/ _ \\/ _ `/ __/ '_/\n", " /__ / .__/\\_,_/_/ /_/\\_\\ version 2.4.0\n", " /_/\n", "\n", "Using Python version 2.7.15 (default, Dec 14 2018 19:04:19)\n", "SparkSession available as 'spark'.\n", "local-1560753877291\n" ] } ], "source": [ "import os, sys, glob, datetime\n", "\n", "# specify spark version, python version\n", "spark_home = \"/home/zero/spark-2.4.0-bin-hadoop2.7\" # MODIFY THIS\n", "python_path =\"/apps/anaconda2/bin/python\"\n", "# set environment variables\n", "os.environ['SPARK_HOME'] = spark_home\n", "os.environ['PYSPARK_PYTHON'] = python_path\n", "os.environ['SPARK_LOCAL_IP'] = \"127.0.0.1\"\n", "\n", "def setup_spark_env(app_name):\n", " # set environment variables\n", " spark_python = os.path.join(spark_home, 'python')\n", " py4j = glob.glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0]\n", " sys.path[:0] = [spark_python, py4j]\n", " # specify Spark application parameters\n", " PYSPARK_SUBMIT_ARGS=\"--master local[2]\"\n", "\n", " os.environ['PYSPARK_SUBMIT_ARGS'] = (PYSPARK_SUBMIT_ARGS \n", " + \" --name '%s_%s'\"%(app_name, datetime.datetime.now().strftime(\"%Y%m%d %H:%M\")) \n", " + \" pyspark-shell\") \n", " return\n", "\n", "#\n", "setup_spark_env(\"your_name_test_spark\") # MODIFY THIS\n", "# launching PySpark application\n", "execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))\n", "sc.setLogLevel('ERROR')\n", "print(\"{}\".format(sc.applicationId))" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import Row\n", "from pyspark.sql.types import *\n", "import pyspark.sql.functions as sf\n", "from pyspark.sql.window import Window\n", "import numpy as np\n", "import pandas as pd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create sample data" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
productcategoryrevenue
0ThinCell phone6000
1NormalTablet1500
2MiniTablet5500
3Ultra thinCell phone5000
4Very thinCell phone6000
5BigTablet2500
6BendableCell phone3000
7FoldableCell phone3000
8ProTablet4500
9Pro2Tablet6500
\n", "
" ], "text/plain": [ " product category revenue\n", "0 Thin Cell phone 6000\n", "1 Normal Tablet 1500\n", "2 Mini Tablet 5500\n", "3 Ultra thin Cell phone 5000\n", "4 Very thin Cell phone 6000\n", "5 Big Tablet 2500\n", "6 Bendable Cell phone 3000\n", "7 Foldable Cell phone 3000\n", "8 Pro Tablet 4500\n", "9 Pro2 Tablet 6500" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data = [\n", " [\"Thin\", \"Cell phone\", 6000],\n", " [\"Normal\", \"Tablet\", 1500],\n", " [\"Mini\", \"Tablet\", 5500],\n", " [\"Ultra thin\", \"Cell phone\", 5000],\n", " [\"Very thin\", \"Cell phone\", 6000],\n", " [\"Big\", \"Tablet\", 2500],\n", " [\"Bendable\", \"Cell phone\", 3000],\n", " [\"Foldable\", \"Cell phone\", 3000],\n", " [\"Pro\", \"Tablet\", 4500], \n", " [\"Pro2\", \"Tablet\", 6500], \n", "]\n", "\n", "productReveneue = pd.DataFrame(data, columns=[\"product\", \"category\", \"revenue\"])\n", "productReveneue.head(10)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+\n", "| product| category|revenue|\n", "+----------+----------+-------+\n", "| Thin|Cell phone| 6000|\n", "| Normal| Tablet| 1500|\n", "| Mini| Tablet| 5500|\n", "|Ultra thin|Cell phone| 5000|\n", "| Very thin|Cell phone| 6000|\n", "| Big| Tablet| 2500|\n", "| Bendable|Cell phone| 3000|\n", "| Foldable|Cell phone| 3000|\n", "| Pro| Tablet| 4500|\n", "| Pro2| Tablet| 6500|\n", "+----------+----------+-------+\n", "\n" ] } ], "source": [ "# convert to Spark Dataframe\n", "df_productReveneue = sqlContext.createDataFrame(productReveneue)\n", "df_productReveneue.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Ranking functions" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+----------+\n", "| product| category|revenue|dense_rank|\n", "+----------+----------+-------+----------+\n", "| Bendable|Cell phone| 3000| 1|\n", "| Foldable|Cell phone| 3000| 1|\n", "|Ultra thin|Cell phone| 5000| 2|\n", "| Thin|Cell phone| 6000| 3|\n", "| Very thin|Cell phone| 6000| 3|\n", "| Normal| Tablet| 1500| 1|\n", "| Big| Tablet| 2500| 2|\n", "| Pro| Tablet| 4500| 3|\n", "| Mini| Tablet| 5500| 4|\n", "| Pro2| Tablet| 6500| 5|\n", "+----------+----------+-------+----------+\n", "\n" ] } ], "source": [ "# Signature: sf.dense_rank()\n", "# Docstring:\n", "# Window function: returns the rank of rows within a window partition, without any gaps.\n", "\n", "# The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking\n", "# sequence when there are ties. That is, if you were ranking a competition using dense_rank\n", "# and had three people tie for second place, you would say that all three were in second\n", "# place and that the next person came in third. Rank would give me sequential numbers, making\n", "# the person that came in third place (after the ties) would register as coming in fifth.\n", "\n", "# This is equivalent to the DENSE_RANK function in SQL.\n", "query_window = Window.partitionBy(\"category\").orderBy(\"revenue\")\n", "df_query = df_productReveneue.withColumn(\"dense_rank\", sf.dense_rank().over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+----+\n", "| product| category|revenue|rank|\n", "+----------+----------+-------+----+\n", "| Thin|Cell phone| 6000| 1|\n", "| Very thin|Cell phone| 6000| 1|\n", "|Ultra thin|Cell phone| 5000| 3|\n", "| Bendable|Cell phone| 3000| 4|\n", "| Foldable|Cell phone| 3000| 4|\n", "| Pro2| Tablet| 6500| 1|\n", "| Mini| Tablet| 5500| 2|\n", "| Pro| Tablet| 4500| 3|\n", "| Big| Tablet| 2500| 4|\n", "| Normal| Tablet| 1500| 5|\n", "+----------+----------+-------+----+\n", "\n" ] } ], "source": [ "# Signature: sf.rank()\n", "# Docstring:\n", "# Window function: returns the rank of rows within a window partition.\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"rank\", sf.rank().over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+------------+\n", "| product| category|revenue|percent_rank|\n", "+----------+----------+-------+------------+\n", "| Thin|Cell phone| 6000| 0.0|\n", "| Very thin|Cell phone| 6000| 0.0|\n", "|Ultra thin|Cell phone| 5000| 0.5|\n", "| Bendable|Cell phone| 3000| 0.75|\n", "| Foldable|Cell phone| 3000| 0.75|\n", "| Pro2| Tablet| 6500| 0.0|\n", "| Mini| Tablet| 5500| 0.25|\n", "| Pro| Tablet| 4500| 0.5|\n", "| Big| Tablet| 2500| 0.75|\n", "| Normal| Tablet| 1500| 1.0|\n", "+----------+----------+-------+------------+\n", "\n" ] } ], "source": [ "# Signature: sf.percent_rank()\n", "# Docstring:\n", "# Window function: returns the relative rank (i.e. percentile) of rows within a window partition.\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"percent_rank\", sf.percent_rank().over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+-----+\n", "| product| category|revenue|ntile|\n", "+----------+----------+-------+-----+\n", "| Thin|Cell phone| 6000| 1|\n", "| Very thin|Cell phone| 6000| 1|\n", "|Ultra thin|Cell phone| 5000| 2|\n", "| Bendable|Cell phone| 3000| 2|\n", "| Foldable|Cell phone| 3000| 3|\n", "| Pro2| Tablet| 6500| 1|\n", "| Mini| Tablet| 5500| 1|\n", "| Pro| Tablet| 4500| 2|\n", "| Big| Tablet| 2500| 2|\n", "| Normal| Tablet| 1500| 3|\n", "+----------+----------+-------+-----+\n", "\n" ] } ], "source": [ "# Signature: sf.ntile(n)\n", "# Docstring:\n", "# Window function: returns the ntile group id (from 1 to `n` inclusive)\n", "# in an ordered window partition. For example, if `n` is 4, the first\n", "# quarter of the rows will get value 1, the second quarter will get 2,\n", "# the third quarter will get 3, and the last quarter will get 4.\n", "\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"ntile\", sf.ntile(3).over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+----------+\n", "| product| category|revenue|row_number|\n", "+----------+----------+-------+----------+\n", "| Thin|Cell phone| 6000| 1|\n", "| Very thin|Cell phone| 6000| 2|\n", "|Ultra thin|Cell phone| 5000| 3|\n", "| Bendable|Cell phone| 3000| 4|\n", "| Foldable|Cell phone| 3000| 5|\n", "| Pro2| Tablet| 6500| 1|\n", "| Mini| Tablet| 5500| 2|\n", "| Pro| Tablet| 4500| 3|\n", "| Big| Tablet| 2500| 4|\n", "| Normal| Tablet| 1500| 5|\n", "+----------+----------+-------+----------+\n", "\n" ] } ], "source": [ "# Signature: sf.row_number()\n", "# Docstring:\n", "# Window function: returns a sequential number starting at 1 within a window partition.\n", "\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"row_number\", sf.row_number().over(query_window))\n", "df_query.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Analytic functions" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+---------+\n", "| product| category|revenue|cume_dist|\n", "+----------+----------+-------+---------+\n", "| Thin|Cell phone| 6000| 0.4|\n", "| Very thin|Cell phone| 6000| 0.4|\n", "|Ultra thin|Cell phone| 5000| 0.6|\n", "| Bendable|Cell phone| 3000| 1.0|\n", "| Foldable|Cell phone| 3000| 1.0|\n", "| Pro2| Tablet| 6500| 0.2|\n", "| Mini| Tablet| 5500| 0.4|\n", "| Pro| Tablet| 4500| 0.6|\n", "| Big| Tablet| 2500| 0.8|\n", "| Normal| Tablet| 1500| 1.0|\n", "+----------+----------+-------+---------+\n", "\n" ] } ], "source": [ "# Signature: sf.cume_dist()\n", "# Docstring:\n", "# Window function: returns the cumulative distribution of values within a window partition,\n", "# i.e. the fraction of rows that are below the current row.\n", "\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"cume_dist\", sf.cume_dist().over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+-----+\n", "| product| category|revenue|first|\n", "+----------+----------+-------+-----+\n", "| Thin|Cell phone| 6000| Thin|\n", "| Very thin|Cell phone| 6000| Thin|\n", "|Ultra thin|Cell phone| 5000| Thin|\n", "| Bendable|Cell phone| 3000| Thin|\n", "| Foldable|Cell phone| 3000| Thin|\n", "| Pro2| Tablet| 6500| Pro2|\n", "| Mini| Tablet| 5500| Pro2|\n", "| Pro| Tablet| 4500| Pro2|\n", "| Big| Tablet| 2500| Pro2|\n", "| Normal| Tablet| 1500| Pro2|\n", "+----------+----------+-------+-----+\n", "\n" ] } ], "source": [ "# Signature: sf.first(col, ignorenulls=False)\n", "# Docstring:\n", "# Aggregate function: returns the first value in a group.\n", "\n", "# The function by default returns the first values it sees. It will return the first non-null\n", "# value it sees when ignoreNulls is set to true. If all values are null, then null is returned.\n", "\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"first\", sf.first(\"product\").over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+----------+\n", "| product| category|revenue| last|\n", "+----------+----------+-------+----------+\n", "| Thin|Cell phone| 6000| Very thin|\n", "| Very thin|Cell phone| 6000| Very thin|\n", "|Ultra thin|Cell phone| 5000|Ultra thin|\n", "| Bendable|Cell phone| 3000| Foldable|\n", "| Foldable|Cell phone| 3000| Foldable|\n", "| Pro2| Tablet| 6500| Pro2|\n", "| Mini| Tablet| 5500| Mini|\n", "| Pro| Tablet| 4500| Pro|\n", "| Big| Tablet| 2500| Big|\n", "| Normal| Tablet| 1500| Normal|\n", "+----------+----------+-------+----------+\n", "\n" ] } ], "source": [ "# Signature: sf.last(col, ignorenulls=False)\n", "# Docstring:\n", "# Aggregate function: returns the last value in a group.\n", "\n", "# The function by default returns the last values it sees. It will return the last non-null\n", "# value it sees when ignoreNulls is set to true. If all values are null, then null is returned.\n", "\n", "# NOTE: not reliable\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"last\", sf.last(\"product\").over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+----+\n", "| product| category|revenue| lag|\n", "+----------+----------+-------+----+\n", "| Thin|Cell phone| 6000|null|\n", "| Very thin|Cell phone| 6000|6000|\n", "|Ultra thin|Cell phone| 5000|6000|\n", "| Bendable|Cell phone| 3000|5000|\n", "| Foldable|Cell phone| 3000|3000|\n", "| Pro2| Tablet| 6500|null|\n", "| Mini| Tablet| 5500|6500|\n", "| Pro| Tablet| 4500|5500|\n", "| Big| Tablet| 2500|4500|\n", "| Normal| Tablet| 1500|2500|\n", "+----------+----------+-------+----+\n", "\n" ] } ], "source": [ "# Signature: sf.lag(col, count=1, default=None)\n", "# Docstring:\n", "# Window function: returns the value that is `offset` rows before the current row, and\n", "# `defaultValue` if there is less than `offset` rows before the current row. For example,\n", "# an `offset` of one will return the previous row at any given point in the window partition.\n", "\n", "# This is equivalent to the LAG function in SQL.\n", "\n", "# :param col: name of column or expression\n", "# :param count: number of row to extend\n", "# :param default: default value\n", "\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"lag\", sf.lag(\"revenue\").over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+----+\n", "| product| category|revenue|lead|\n", "+----------+----------+-------+----+\n", "| Thin|Cell phone| 6000|6000|\n", "| Very thin|Cell phone| 6000|5000|\n", "|Ultra thin|Cell phone| 5000|3000|\n", "| Bendable|Cell phone| 3000|3000|\n", "| Foldable|Cell phone| 3000|null|\n", "| Pro2| Tablet| 6500|5500|\n", "| Mini| Tablet| 5500|4500|\n", "| Pro| Tablet| 4500|2500|\n", "| Big| Tablet| 2500|1500|\n", "| Normal| Tablet| 1500|null|\n", "+----------+----------+-------+----+\n", "\n" ] } ], "source": [ "# Signature: sf.lead(col, count=1, default=None)\n", "# Docstring:\n", "# Window function: returns the value that is `offset` rows after the current row, and\n", "# `defaultValue` if there is less than `offset` rows after the current row. For example,\n", "# an `offset` of one will return the next row at any given point in the window partition.\n", "\n", "# This is equivalent to the LEAD function in SQL.\n", "\n", "# :param col: name of column or expression\n", "# :param count: number of row to extend\n", "# :param default: default value\n", "\n", "query_window = Window.partitionBy(\"category\").orderBy(sf.col(\"revenue\").desc())\n", "df_query = df_productReveneue.withColumn(\"lead\", sf.lead(\"revenue\").over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+-----------+-----------+-------------+\n", "| product| category|revenue|max_revenue|avg_revenue|total_revenue|\n", "+----------+----------+-------+-----------+-----------+-------------+\n", "| Thin|Cell phone| 6000| 6000| 4600.0| 23000|\n", "|Ultra thin|Cell phone| 5000| 6000| 4600.0| 23000|\n", "| Very thin|Cell phone| 6000| 6000| 4600.0| 23000|\n", "| Bendable|Cell phone| 3000| 6000| 4600.0| 23000|\n", "| Foldable|Cell phone| 3000| 6000| 4600.0| 23000|\n", "| Normal| Tablet| 1500| 6500| 4100.0| 20500|\n", "| Mini| Tablet| 5500| 6500| 4100.0| 20500|\n", "| Big| Tablet| 2500| 6500| 4100.0| 20500|\n", "| Pro| Tablet| 4500| 6500| 4100.0| 20500|\n", "| Pro2| Tablet| 6500| 6500| 4100.0| 20500|\n", "+----------+----------+-------+-----------+-----------+-------------+\n", "\n" ] } ], "source": [ "# populate same result across rows\n", "query_window = Window.partitionBy(\"category\")\n", "df_query = df_productReveneue.withColumn(\"max_revenue\", sf.max(\"revenue\").over(query_window))\n", "df_query = df_query.withColumn(\"avg_revenue\", sf.avg(\"revenue\").over(query_window))\n", "df_query = df_query.withColumn(\"total_revenue\", sf.sum(\"revenue\").over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+-----------+------------------+-------------+\n", "| product| category|revenue|max_revenue| avg_revenue|total_revenue|\n", "+----------+----------+-------+-----------+------------------+-------------+\n", "| Bendable|Cell phone| 3000| 3000| 3000.0| 6000|\n", "| Foldable|Cell phone| 3000| 3000| 3000.0| 6000|\n", "|Ultra thin|Cell phone| 5000| 5000|3666.6666666666665| 11000|\n", "| Thin|Cell phone| 6000| 6000| 4600.0| 23000|\n", "| Very thin|Cell phone| 6000| 6000| 4600.0| 23000|\n", "| Normal| Tablet| 1500| 1500| 1500.0| 1500|\n", "| Big| Tablet| 2500| 2500| 2000.0| 4000|\n", "| Pro| Tablet| 4500| 4500|2833.3333333333335| 8500|\n", "| Mini| Tablet| 5500| 5500| 3500.0| 14000|\n", "| Pro2| Tablet| 6500| 6500| 4100.0| 20500|\n", "+----------+----------+-------+-----------+------------------+-------------+\n", "\n" ] } ], "source": [ "# will accumulate if use orderBy\n", "query_window = Window.partitionBy(\"category\").orderBy(\"revenue\")\n", "df_query = df_productReveneue.withColumn(\"max_revenue\", sf.max(\"revenue\").over(query_window))\n", "df_query = df_query.withColumn(\"avg_revenue\", sf.avg(\"revenue\").over(query_window))\n", "df_query = df_query.withColumn(\"total_revenue\", sf.sum(\"revenue\").over(query_window))\n", "df_query.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# ROW vs RANGE frame" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+-----------+\n", "| product| category|revenue|max_revenue|\n", "+----------+----------+-------+-----------+\n", "| Bendable|Cell phone| 3000| 3000|\n", "| Foldable|Cell phone| 3000| 5000|\n", "|Ultra thin|Cell phone| 5000| 6000|\n", "| Thin|Cell phone| 6000| 6000|\n", "| Very thin|Cell phone| 6000| 6000|\n", "| Normal| Tablet| 1500| 2500|\n", "| Big| Tablet| 2500| 4500|\n", "| Pro| Tablet| 4500| 5500|\n", "| Mini| Tablet| 5500| 6500|\n", "| Pro2| Tablet| 6500| 6500|\n", "+----------+----------+-------+-----------+\n", "\n" ] } ], "source": [ "query_window = Window.partitionBy(\"category\").orderBy(\"revenue\").rowsBetween(-1, 1)\n", "df_query = df_productReveneue.withColumn(\"max_revenue\", sf.max(\"revenue\").over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+-----------+\n", "| product| category|revenue|max_revenue|\n", "+----------+----------+-------+-----------+\n", "| Bendable|Cell phone| 3000| 3000|\n", "| Foldable|Cell phone| 3000| 3000|\n", "|Ultra thin|Cell phone| 5000| 6000|\n", "| Thin|Cell phone| 6000| 6000|\n", "| Very thin|Cell phone| 6000| 6000|\n", "| Normal| Tablet| 1500| 2500|\n", "| Big| Tablet| 2500| 2500|\n", "| Pro| Tablet| 4500| 5500|\n", "| Mini| Tablet| 5500| 6500|\n", "| Pro2| Tablet| 6500| 6500|\n", "+----------+----------+-------+-----------+\n", "\n" ] } ], "source": [ "query_window = Window.partitionBy(\"category\").orderBy(\"revenue\").rangeBetween(0, 1000)\n", "df_query = df_productReveneue.withColumn(\"max_revenue\", sf.max(\"revenue\").over(query_window))\n", "df_query.show()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------+-------+-------------+\n", "| product| category|revenue|accum_revenue|\n", "+----------+----------+-------+-------------+\n", "| Bendable|Cell phone| 3000| 3000|\n", "| Foldable|Cell phone| 3000| 6000|\n", "|Ultra thin|Cell phone| 5000| 11000|\n", "| Thin|Cell phone| 6000| 17000|\n", "| Very thin|Cell phone| 6000| 23000|\n", "| Normal| Tablet| 1500| 1500|\n", "| Big| Tablet| 2500| 4000|\n", "| Pro| Tablet| 4500| 8500|\n", "| Mini| Tablet| 5500| 14000|\n", "| Pro2| Tablet| 6500| 20500|\n", "+----------+----------+-------+-------------+\n", "\n" ] } ], "source": [ "query_window = Window.partitionBy(\"category\").orderBy(\"revenue\").rowsBetween(Window.unboundedPreceding, 0)\n", "df_query = df_productReveneue.withColumn(\"accum_revenue\", sf.sum(\"revenue\").over(query_window))\n", "df_query.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.15" } }, "nbformat": 4, "nbformat_minor": 2 }