{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Tutorial-1: Introduction to PySpark RDD\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import and initialize SparkContext" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.appName(\"tutorial-1\").getOrCreate()\n", "sc = spark.sparkContext" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Parallelized Collections" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = [1, 2, 3, 4, 5]\n", "distData = sc.parallelize(data)\n", "\n", "type(distData)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TODO: Read external data file\n", "### Use SparkContext's textFile function to read in a text file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "textFilePath = './emails.txt'\n", "\n", "emails = ## YOUR CODE GOES HERE ##\n", "type(emails)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Generate a list of data, from 1 to 10" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = list(range(1,11))\n", "print(data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallelize the data with 2 partitions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "numbers = sc.parallelize(data,2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Print RDD" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(numbers)\n", "\n", "print(numbers.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get only even numbers, and collect them" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "numbers.filter(lambda x: x % 2 == 0).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TODO: find emails with hotmail domains" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "emails.filter(## YOUR CODE GOES HERE ##).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Square all the numbers in the list using the map operation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "numbers.map(lambda x: x*2).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use flatMap to apply a function that returns a list and flatten the result" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "m = numbers.map(lambda x: [x**2, x**3]).collect()\n", "\n", "fm = numbers.flatMap(lambda x: [x**2, x**3]).collect()\n", "\n", "print(m)\n", "print(fm)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TODO: separate username and domain from all emails\n", "\n", "### eg: marshuang80@gmail.com -> [marshuang80, gmail.com]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# Hint use the python split() function\n", "username_domain = emails.map(## YOUR CODE GOES HERE ##)\n", "username_domain.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD Key-Value Pairs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "username_domain.keys().collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "username_domain.values().collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reduce by key" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = [\"a\", \"b\", \"a\", \"a\", \"b\", \"b\", \"b\", \"b\"]\n", "rdd = sc.parallelize(data)\n", "\n", "pairRDD = rdd.map(lambda x: (x, 1))\n", "\n", "pairRDD.reduceByKey(lambda x,y: x+y).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TODO: count the number of domains with the same username" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# do another mapping operation to make all domains in a list\n", "username_domain = username_domain.map(## YOUR CODE GOES HERE ##)\n", "print(\"** Results from mapping values to list\")\n", "print(username_domain.top(3))\n", "\n", "print(\"\\n** Results from reduceByKey ** \")\n", "username_domain.reduceByKey(## YOUR CODE GOES HERE ##).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Very important to stop Spark" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 2 }