{ "cells": [ { "cell_type": "markdown", "id": "8a2fe751", "metadata": {}, "source": [ "# ⭐ Scaling Machine Learning in Three Week course ⭐\n", "\n", "## Intro to PySpark\n", "\n", "In this excercise, you will use:\n", " * RDD\n", " * DataFrame\n", " * Spark SQL\n", " * Loading data from file\n", " * Getting a feel for the data\n", "\n", "\n", "\n", "This excercise is part of the [Scaling Machine Learning with Spark book](https://learning.oreilly.com/library/view/scaling-machine-learning/9781098106812/)\n", "available on the O'Reilly platform or on [Amazon](https://amzn.to/3WgHQvd).\n" ] }, { "cell_type": "code", "execution_count": 33, "id": "28acad89", "metadata": {}, "outputs": [], "source": [ "# Create SparkSession from builder\n", "import pyspark\n", "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.master(\"local[1]\") \\\n", " .appName('Scalling_ml_with_spark') \\\n", " .getOrCreate()\n" ] }, { "cell_type": "code", "execution_count": 3, "id": "5a0e61ff", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import *\n", "from pyspark.sql.functions import *" ] }, { "cell_type": "markdown", "id": "afdb2330", "metadata": {}, "source": [ "## Get started with PySpark RDD\n", "\n", "**RDD transformations** – Transformations are lazy operations. When you run a transformation(for example update), instead of updating a current RDD, these operations return another RDD.\n", "\n", "**RDD actions** – operations that trigger computation and return RDD values to the driver.\n" ] }, { "cell_type": "code", "execution_count": 4, "id": "ee32f489", "metadata": {}, "outputs": [], "source": [ "\n", "# Create RDD from parallelize \n", "dataList = [(\"scaling\", 20000), (\"machine-learning\", 100000), (\"pyspark\", 3000)]\n", "rdd = spark.sparkContext.parallelize(dataList)\n" ] }, { "cell_type": "code", "execution_count": 10, "id": "28b945f1", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('scalling', 20000)]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd.take(1)" ] }, { "cell_type": "markdown", "id": "ffa761f2", "metadata": {}, "source": [ " * Find intresting operations that you can do on top of RDD and share in the chat!" ] }, { "cell_type": "markdown", "id": "f448c834", "metadata": {}, "source": [ "## Get started with DataFrame" ] }, { "cell_type": "code", "execution_count": 5, "id": "586c38ee", "metadata": {}, "outputs": [], "source": [ "\n", "data = [('Adi','','Polak','1991-04-01','M',3000),\n", " ('Michael','Smith','','2000-05-19','M',4000),\n", " ('Robert','','Jhonie','1978-09-05','M',4000),\n", " ('Maria','Anne','Swiss','1967-12-01','F',4000),\n", " ('Jen','Condo','Brown','1980-02-17','F',-1)\n", "]\n", "\n", "columns = [\"firstname\",\"middlename\",\"lastname\",\"dob\",\"gender\",\"salary\"]\n", "df = spark.createDataFrame(data=data, schema = columns)\n" ] }, { "cell_type": "code", "execution_count": 6, "id": "9a6bc85b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- firstname: string (nullable = true)\n", " |-- middlename: string (nullable = true)\n", " |-- lastname: string (nullable = true)\n", " |-- dob: string (nullable = true)\n", " |-- gender: string (nullable = true)\n", " |-- salary: long (nullable = true)\n", "\n" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "code", "execution_count": 7, "id": "08f392df", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+----------+--------+----------+------+------+\n", "|firstname|middlename|lastname| dob|gender|salary|\n", "+---------+----------+--------+----------+------+------+\n", "| Adi| | Polak|1991-04-01| M| 3000|\n", "| Michael| Smith| |2000-05-19| M| 4000|\n", "| Robert| | Jhonie|1978-09-05| M| 4000|\n", "| Maria| Anne| Swiss|1967-12-01| F| 4000|\n", "| Jen| Condo| Brown|1980-02-17| F| -1|\n", "+---------+----------+--------+----------+------+------+\n", "\n" ] } ], "source": [ "df.show()" ] }, { "cell_type": "markdown", "id": "9038ce1a", "metadata": {}, "source": [ "✅ **Task :** \n", "\n", "Explore the funationalitis of the RDD and the DataFrame. what is the difference between them?\n", "\n", "How can you convert RDD to a DataFrame? name it df2\n", "\n", "\n", "\n", "
Hint\n", "

\n", " \n", "Try out the following python code snippet:\n", " \n", "```python\n", "df2 = rdd.toDF()\n", "df2.printSchema()\n", "df2.show(truncate=False)\n", " \n", "```\n", "

\n", "
\n", "\n" ] }, { "cell_type": "code", "execution_count": 15, "id": "5fcaa005", "metadata": {}, "outputs": [], "source": [ "# your answer is going here..\n" ] }, { "attachments": { "image.png": { "image/png": "" } }, "cell_type": "markdown", "id": "36e185a5", "metadata": {}, "source": [ "Now that you have a df, let's look at it and add names to the columns _1, _2.\n", "\n", "![image.png](attachment:image.png)" ] }, { "cell_type": "code", "execution_count": 13, "id": "eff44d67", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- id: long (nullable = true)\n", "\n", "+----------------+------+\n", "|name |id |\n", "+----------------+------+\n", "|scalling |20000 |\n", "|machine-learning|100000|\n", "|pyspark |3000 |\n", "+----------------+------+\n", "\n" ] } ], "source": [ "deptColumns = [\"name\",\"id\"]\n", "df2 = rdd.toDF(deptColumns)\n", "df2.printSchema()\n", "df2.show(truncate=False)" ] }, { "cell_type": "markdown", "id": "c70f6e2b", "metadata": {}, "source": [ "The book mentioned critical best practice around struct. since data in spark is bounded to a specific struct. many times it is better to not have it hardcoded and rather add a configuration file. let's see how that would look and feel with a code snippet:" ] }, { "cell_type": "code", "execution_count": 14, "id": "c18d9399", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- id: string (nullable = true)\n", "\n", "+----------------+------+\n", "|name |id |\n", "+----------------+------+\n", "|scalling |20000 |\n", "|machine-learning|100000|\n", "|pyspark |3000 |\n", "+----------------+------+\n", "\n" ] } ], "source": [ "from pyspark.sql.types import StructType,StructField, StringType\n", "\n", "deptSchema = StructType([ \n", " StructField('name', StringType(), True),\n", " StructField('id', StringType(), True)\n", "])\n", "\n", "deptDF1 = spark.createDataFrame(rdd, schema = deptSchema)\n", "deptDF1.printSchema()\n", "deptDF1.show(truncate=False)" ] }, { "cell_type": "markdown", "id": "20fd8dea", "metadata": {}, "source": [ "## Temp views and running SQL" ] }, { "cell_type": "code", "execution_count": 11, "id": "8bf93b74", "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- firstname: string (nullable = true)\n", " |-- middlename: string (nullable = true)\n", " |-- lastname: string (nullable = true)\n", " |-- dob: string (nullable = true)\n", " |-- gender: string (nullable = true)\n", " |-- salary: long (nullable = true)\n", "\n", "+---------+----------+--------+----------+------+------+\n", "|firstname|middlename|lastname| dob|gender|salary|\n", "+---------+----------+--------+----------+------+------+\n", "| Adi| | Polak|1991-04-01| M| 3000|\n", "| Michael| Smith| |2000-05-19| M| 4000|\n", "| Robert| | Jhonie|1978-09-05| M| 4000|\n", "| Maria| Anne| Swiss|1967-12-01| F| 4000|\n", "| Jen| Condo| Brown|1980-02-17| F| -1|\n", "+---------+----------+--------+----------+------+------+\n", "\n" ] } ], "source": [ "df.createOrReplaceTempView(\"PERSON_DATA\")\n", "df2 = spark.sql(\"SELECT * from PERSON_DATA\")\n", "df2.printSchema()\n", "df2.show()" ] }, { "cell_type": "markdown", "id": "b95e5a94", "metadata": {}, "source": [ "✅ **Task :** \n", "\n", "Now that you know that you can run SQL like operations using ```spark.sql``` Group the person data by gender using a sql query.\n", "\n", "\n", "Share how did you go about that! Use hint only if necessary. the goal is for you to experiment with pyspark api.\n", "\n", "\n", "\n", "
Hint\n", "

\n", " \n", "Try out the following python code snippet:\n", " \n", "```python\n", "groupDF = spark.sql(\"SELECT gender, count(*) from PERSON_DATA group by gender\")\n", "groupDF.show()\n", " \n", "```\n", "

\n", "
\n" ] }, { "cell_type": "code", "execution_count": 20, "id": "b621da1b", "metadata": {}, "outputs": [], "source": [ "# your answer is going here..\n", "\n" ] }, { "cell_type": "markdown", "id": "764205ba", "metadata": {}, "source": [ "## ⭐ - Time to start with our Twitter data journey ⭐ \n", "Now that you feel a bit more comfortable. it is a good time to level up and start working with out Twitter data.\n", "This exercise will also continue into week 2 class.\n", "\n", "In this part, you will:\n", "\n", "• Task 1: Load data into a Spark DataFrame (DF)\n", "\n", "• Task 2: Immutability\n", "\n", "• Task 3: Get a feel for the data" ] }, { "cell_type": "markdown", "id": "ba8bbc7a", "metadata": {}, "source": [ "## ✅ **Task :** load the data" ] }, { "cell_type": "code", "execution_count": 30, "id": "95884953", "metadata": {}, "outputs": [], "source": [ "df = spark.read.csv ('../datasets/bot_data.csv', header= True) " ] }, { "cell_type": "code", "execution_count": 31, "id": "9d7e98a7", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "2840" ] }, "execution_count": 31, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# understand what is the data size:\n", "df.count()" ] }, { "cell_type": "markdown", "id": "fb8d8118", "metadata": {}, "source": [ "## ✅ **Task :** Immutability\n", "DataFrame in Spark is **immutable**.\n", "\n", "What does that mean? It means that every action we do on DataFrame doesn't change the actual DataFrame!\n", "\n", "Instead, it creates a new DataFrame. Run the next commands and get a feel for working with DataFrame.\n", "\n", "Don't worry if you don't understand everything completely, the next exercises go deeper into it." ] }, { "cell_type": "code", "execution_count": 25, "id": "a88b81b8", "metadata": {}, "outputs": [], "source": [ "df_new = df.select('bot')" ] }, { "cell_type": "code", "execution_count": 24, "id": "88bc24b5", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idid_strscreen_namelocationdescriptionurlfollowers_countfriends_countlisted_countcreated_atfavourites_countverifiedstatuses_countlangstatusdefault_profiledefault_profile_imagehas_extended_profilenamebot
08.16E+17\"\"\"815745789754417152\"\"\"\"\"\"HoustonPokeMap\"\"\"\"\"\"HoustonTX\"\"\"\"\"\"Rare and strong PokŽmon in HoustonTX. See more PokŽmon at https://t.co/dnWuDbFR...\"\"\"https://t.co/dnWuDbFRkt\"\"\"1291010\"\"\"Mon Jan 02 02:25:26 +0000 2017\"\"\"0FALSE78554\"\"\"en\"\"\"\"{ \"\"created_at\"\": \"\"Sun Mar 12 15:44:04 ...\"\"id\"\": 840951532543737900\"\"id_str\"\": \"\"840951532543737856\"\"\"\"text\"\": \"\"[Southeast Houston] Chansey ...
148436212254843621225kernyeahxTempleville town, MD, USAFrom late 2014 Socium Marketplace will make sh...None134902/1/2016 7:3738FALSE31ennullTRUEFALSEFALSEKeri Nelson1
\n", "
" ], "text/plain": [ " id id_str screen_name \\\n", "0 8.16E+17 \"\"\"815745789754417152\"\"\" \"\"\"HoustonPokeMap\"\"\" \n", "1 4843621225 4843621225 kernyeahx \n", "\n", " location \\\n", "0 \"\"\"Houston \n", "1 Templeville town, MD, USA \n", "\n", " description \\\n", "0 TX\"\"\" \n", "1 From late 2014 Socium Marketplace will make sh... \n", "\n", " url \\\n", "0 \"\"\"Rare and strong PokŽmon in Houston \n", "1 None \n", "\n", " followers_count \\\n", "0 TX. See more PokŽmon at https://t.co/dnWuDbFR... \n", "1 1 \n", "\n", " friends_count listed_count created_at favourites_count \\\n", "0 \"\"\"https://t.co/dnWuDbFRkt\"\"\" 1291 0 10 \n", "1 349 0 2/1/2016 7:37 38 \n", "\n", " verified statuses_count lang status \\\n", "0 \"\"\"Mon Jan 02 02:25:26 +0000 2017\"\"\" 0 FALSE 78554 \n", "1 FALSE 31 en null \n", "\n", " default_profile default_profile_image \\\n", "0 \"\"\"en\"\"\" \"{ \"\"created_at\"\": \"\"Sun Mar 12 15:44:04 ... \n", "1 TRUE FALSE \n", "\n", " has_extended_profile name \\\n", "0 \"\"id\"\": 840951532543737900 \"\"id_str\"\": \"\"840951532543737856\"\" \n", "1 FALSE Keri Nelson \n", "\n", " bot \n", "0 \"\"text\"\": \"\"[Southeast Houston] Chansey ... \n", "1 1 " ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.limit(2) .toPandas ()" ] }, { "cell_type": "code", "execution_count": null, "id": "004a9751", "metadata": {}, "outputs": [], "source": [ "df_new = df.select('bot')" ] }, { "cell_type": "code", "execution_count": null, "id": "2cd36188", "metadata": {}, "outputs": [], "source": [ "df_new.limit(2) .toPandas ()" ] }, { "cell_type": "markdown", "id": "40206a90", "metadata": {}, "source": [ "You probably notice that ```df_new```, and ```df``` are different! They are pointers to two different DataFrames.\n", "\n", "Try the next command:\n", "\n" ] }, { "cell_type": "code", "execution_count": 27, "id": "6b45da78", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
bot
0\"\"text\"\": \"\"[Southeast Houston] Chansey ...
11
\n", "
" ], "text/plain": [ " bot\n", "0 \"\"text\"\": \"\"[Southeast Houston] Chansey ...\n", "1 1" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.select('bot').limit(2) .toPandas ()" ] }, { "cell_type": "markdown", "id": "327eac58", "metadata": {}, "source": [ "The last ```toPandas ()``` commands printed different results,\n", "\n", "## why?\n", "```df.select('bot')``` functionality returns pointer to a new immutable DataFrame! AHA!\n", "\n", "Let's have ```df_new``` and ```df``` point to the same DataFrame. By doing this, we release the pointer from df_new and it can be erased from memory.\n", "\n", "If we wish to have access to it again, we will need to rerun the logic. Bear that in mind for working with ```Apache Spark```." ] }, { "cell_type": "code", "execution_count": null, "id": "b7b29049", "metadata": {}, "outputs": [], "source": [ "df_new = df\n", "df_new.limit(2) .toPandas()" ] }, { "cell_type": "markdown", "id": "b72683de", "metadata": {}, "source": [ "By the Way! ```limit(2)``` returns a pointer to a DataFrame with 2 rows.\n", "\n", "Interesting! This is what **Immutability** means!!" ] }, { "cell_type": "markdown", "id": "ec434e22", "metadata": {}, "source": [ "## ✅ **Task :** Get a feel for the data\n", "\n", "Query the DF using SQL to get a feel for the data" ] }, { "cell_type": "markdown", "id": "3ef114f7", "metadata": {}, "source": [ "Look at 2 records from the DataFrame to understand the values better before filter: use take() function\n", "\n", "df.take(insert an integer here)" ] }, { "cell_type": "code", "execution_count": 32, "id": "6a8398db", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(id='8.16E+17', id_str='\"\"\"815745789754417152\"\"\"', screen_name='\"\"\"HoustonPokeMap\"\"\"', location='\"\"\"Houston', description=' TX\"\"\"', url='\"\"\"Rare and strong PokŽmon in Houston', followers_count=' TX. See more PokŽmon at https://t.co/dnWuDbFRkt\"\"\"', friends_count='\"\"\"https://t.co/dnWuDbFRkt\"\"\"', listed_count='1291', created_at='0', favourites_count='10', verified='\"\"\"Mon Jan 02 02:25:26 +0000 2017\"\"\"', statuses_count='0', lang='FALSE', status='78554', default_profile='\"\"\"en\"\"\"', default_profile_image='\"{ \"\"created_at\"\": \"\"Sun Mar 12 15:44:04 +0000 2017\"\"', has_extended_profile=' \"\"id\"\": 840951532543737900', name=' \"\"id_str\"\": \"\"840951532543737856\"\"', bot=' \"\"text\"\": \"\"[Southeast Houston] Chansey (F) (IV: 73%) until 11:11:37AM at 2511 Winbern St https://t.co/HYRIyq4mF7 https://t.co/bydOOKsEEI\"\"'),\n", " Row(id='4843621225', id_str='4843621225', screen_name='kernyeahx', location='Templeville town, MD, USA', description='From late 2014 Socium Marketplace will make shopping for fundamental business services more simple, more cost effective and more about you.', url=None, followers_count='1', friends_count='349', listed_count='0', created_at='2/1/2016 7:37', favourites_count='38', verified='FALSE', statuses_count='31', lang='en', status='null', default_profile='TRUE', default_profile_image='FALSE', has_extended_profile='FALSE', name='Keri Nelson', bot='1')]" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "\n", "df.take(2)" ] }, { "cell_type": "markdown", "id": "5f7a8a11", "metadata": {}, "source": [ "Check out the schema stracture of the DataFrame.\n", "\n", "What are the values types? Use:\n", "\n", "```df.printSchema()```" ] }, { "cell_type": "code", "execution_count": null, "id": "133234bb", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "id": "410bdfc4", "metadata": {}, "source": [ "Run the next function:\n", "\n", "> ```df.limit(25) .toPandas ()```\n", "What happened here? toPandas function took the Spark DataFrame and translated it into Pandas DataFrame.\n", "\n", "### Run this function only on small data sets and when exploring the data.\n", "Otherwise, you might throw an out of memory exception!" ] }, { "cell_type": "markdown", "id": "3ce2c37b", "metadata": {}, "source": [ "## Well Done! 👏👏👏\n", "## You just finished: Intro to PySpark\n", "## Next Week: Feature Engineering and Marshaling the data" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.4" } }, "nbformat": 4, "nbformat_minor": 5 }